Compare commits

..

25 Commits

Author SHA1 Message Date
Anastasia Lubennikova
26e1e430c0 Check postgres version and ensure that public schema exists
before running GRANT query on it
2022-10-22 02:14:26 +03:00
Stas Kelvich
f3aac81d19 Merge pull request #2668 from neondatabase/main
Release 2022-10-21
2022-10-21 15:21:42 +03:00
Sergey Melnikov
2709878b8b Deploy scram proxies into new account (#2643) 2022-10-21 14:21:22 +03:00
Kirill Bulatov
39e4bdb99e Actualize tenant and timeline API modifiers (#2661)
* Actualize tenant and timeline API modifiers
* Use anyhow::Result explicitly
2022-10-21 10:58:43 +00:00
Stas Kelvich
979ad60c19 Merge pull request #2581 from neondatabase/main
Release 2022-10-07
2022-10-07 16:50:55 +03:00
Stas Kelvich
9316cb1b1f Merge pull request #2573 from neondatabase/main
Release 2022-10-06
2022-10-07 11:07:06 +03:00
Anastasia Lubennikova
e7939a527a Merge pull request #2377 from neondatabase/main
Release 2022-09-01
2022-09-01 20:20:44 +03:00
Arthur Petukhovsky
36d26665e1 Merge pull request #2299 from neondatabase/main
* Check for entire range during sasl validation (#2281)

* Gen2 GH runner (#2128)

* Re-add rustup override

* Try s3 bucket

* Set git version

* Use v4 cache key to prevent problems

* Switch to v5 for key

* Add second rustup fix

* Rebase

* Add kaniko steps

* Fix typo and set compress level

* Disable global run default

* Specify shell for step

* Change approach with kaniko

* Try less verbose shell spec

* Add submodule pull

* Add promote step

* Adjust dependency chain

* Try default swap again

* Use env

* Don't override aws key

* Make kaniko build conditional

* Specify runs on

* Try without dependency link

* Try soft fail

* Use image with git

* Try passing to next step

* Fix duplicate

* Try other approach

* Try other approach

* Fix typo

* Try other syntax

* Set env

* Adjust setup

* Try step 1

* Add link

* Try global env

* Fix mistake

* Debug

* Try other syntax

* Try other approach

* Change order

* Move output one step down

* Put output up one level

* Try other syntax

* Skip build

* Try output

* Re-enable build

* Try other syntax

* Skip middle step

* Update check

* Try first step of dockerhub push

* Update needs dependency

* Try explicit dir

* Add missing package

* Try other approach

* Try other approach

* Specify region

* Use with

* Try other approach

* Add debug

* Try other approach

* Set region

* Follow AWS example

* Try github approach

* Skip Qemu

* Try stdin

* Missing steps

* Add missing close

* Add echo debug

* Try v2 endpoint

* Use v1 endpoint

* Try without quotes

* Revert

* Try crane

* Add debug

* Split steps

* Fix duplicate

* Add shell step

* Conform to options

* Add verbose flag

* Try single step

* Try workaround

* First request fails hunch

* Try bullseye image

* Try other approach

* Adjust verbose level

* Try previous step

* Add more debug

* Remove debug step

* Remove rogue indent

* Try with larger image

* Add build tag step

* Update workflow for testing

* Add tag step for test

* Remove unused

* Update dependency chain

* Add ownership fix

* Use matrix for promote

* Force update

* Force build

* Remove unused

* Add new image

* Add missing argument

* Update dockerfile copy

* Update Dockerfile

* Update clone

* Update dockerfile

* Go to correct folder

* Use correct format

* Update dockerfile

* Remove cd

* Debug find where we are

* Add debug on first step

* Changedir to postgres

* Set workdir

* Use v1 approach

* Use other dependency

* Try other approach

* Try other approach

* Update dockerfile

* Update approach

* Update dockerfile

* Update approach

* Update dockerfile

* Update dockerfile

* Add workspace hack

* Update Dockerfile

* Update Dockerfile

* Update Dockerfile

* Change last step

* Cleanup pull in prep for review

* Force build images

* Add condition for latest tagging

* Use pinned version

* Try without name value

* Remove more names

* Shorten names

* Add kaniko comments

* Pin kaniko

* Pin crane and ecr helper

* Up one level

* Switch to pinned tag for rust image

* Force update for test

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@b04468bf-cdf4-41eb-9c94-aff4ca55e4bf.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@4795e9ee-4f32-401f-85f3-f316263b62b8.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@2f8bc4e5-4ec2-4ea2-adb1-65d863c4a558.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@27565b2b-72d5-4742-9898-a26c9033e6f9.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@ecc96c26-c6c4-4664-be6e-34f7c3f89a3c.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@7caff3a5-bf03-4202-bd0e-f1a93c86bdae.fritz.box>

* Add missing step output, revert one deploy step (#2285)

* Add missing step output, revert one deploy step

* Conform to syntax

* Update approach

* Add missing value

* Add missing needs

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Error for fatal not git repo (#2286)

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Use main, not branch for ref check (#2288)

* Use main, not branch for ref check

* Add more debug

* Count main, not head

* Try new approach

* Conform to syntax

* Update approach

* Get full history

* Skip checkout

* Cleanup debug

* Remove more debug

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Fix docker zombie process issue (#2289)

* Fix docker zombie process issue

* Init everywhere

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Fix 1.63 clippy lints (#2282)

* split out timeline metrics, track layer map loading and size calculation

* reset rust cache for clippy run to avoid an ICE

additionally remove trailing whitespaces

* Rename pg_control_ffi.h to bindgen_deps.h, for clarity.

The pg_control_ffi.h name implies that it only includes stuff related to
pg_control.h. That's mostly true currently, but really the point of the
file is to include everything that we need to generate Rust definitions
from.

* Make local mypy behave like CI mypy (#2291)

* Fix flaky pageserver restarts in tests (#2261)

* Remove extra type aliases (#2280)

* Update cachepot endpoint (#2290)

* Update cachepot endpoint

* Update dockerfile & remove env

* Update image building process

* Cannot use metadata endpoint for this

* Update workflow

* Conform to kaniko syntax

* Update syntax

* Update approach

* Update dockerfiles

* Force update

* Update dockerfiles

* Update dockerfile

* Cleanup dockerfiles

* Update s3 test location

* Revert s3 experiment

* Add more debug

* Specify aws region

* Remove debug, add prefix

* Remove one more debug

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* workflows/benchmarking: increase timeout (#2294)

* Rework `init` in pageserver CLI  (#2272)

* Do not create initial tenant and timeline (adjust Python tests for that)
* Rework config handling during init, add --update-config to manage local config updates

* Fix: Always build images (#2296)

* Always build images

* Remove unused

Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>

* Move auto-generated 'bindings' to a separate inner module.

Re-export only things that are used by other modules.

In the future, I'm imagining that we run bindgen twice, for Postgres
v14 and v15. The two sets of bindings would go into separate
'bindings_v14' and 'bindings_v15' modules.

Rearrange postgres_ffi modules.

Move function, to avoid Postgres version dependency in timelines.rs
Move function to generate a logical-message WAL record to postgres_ffi.

* fix cargo test

* Fix walreceiver and safekeeper bugs (#2295)

- There was an issue with zero commit_lsn `reason: LaggingWal { current_commit_lsn: 0/0, new_commit_lsn: 1/6FD90D38, threshold: 10485760 } }`. The problem was in `send_wal.rs`, where we initialized `end_pos = Lsn(0)` and in some cases sent it to the pageserver.
- IDENTIFY_SYSTEM previously returned `flush_lsn` as a physical end of WAL. Now it returns `flush_lsn` (as it was) to walproposer and `commit_lsn` to everyone else including pageserver.
- There was an issue with backoff where connection was cancelled right after initialization: `connected!` -> `safekeeper_handle_db: Connection cancelled` -> `Backoff: waiting 3 seconds`. The problem was in sleeping before establishing the connection. This is fixed by reworking retry logic.
- There was an issue with getting `NoKeepAlives` reason in a loop. The issue is probably the same as the previous.
- There was an issue with filtering safekeepers based on retry attempts, which could filter some safekeepers indefinetely. This is fixed by using retry cooldown duration instead of retry attempts.
- Some `send_wal.rs` connections failed with errors without context. This is fixed by adding a timeline to safekeepers errors.

New retry logic works like this:
- Every candidate has a `next_retry_at` timestamp and is not considered for connection until that moment
- When walreceiver connection is closed, we update `next_retry_at` using exponential backoff, increasing the cooldown on every disconnect.
- When `last_record_lsn` was advanced using the WAL from the safekeeper, we reset the retry cooldown and exponential backoff, allowing walreceiver to reconnect to the same safekeeper instantly.

* on safekeeper registration pass availability zone param (#2292)

Co-authored-by: Kirill Bulatov <kirill@neon.tech>
Co-authored-by: Rory de Zoete <33318916+zoete@users.noreply.github.com>
Co-authored-by: Rory de Zoete <rdezoete@RorysMacStudio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@b04468bf-cdf4-41eb-9c94-aff4ca55e4bf.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@Rorys-Mac-Studio.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@4795e9ee-4f32-401f-85f3-f316263b62b8.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@2f8bc4e5-4ec2-4ea2-adb1-65d863c4a558.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@27565b2b-72d5-4742-9898-a26c9033e6f9.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@ecc96c26-c6c4-4664-be6e-34f7c3f89a3c.fritz.box>
Co-authored-by: Rory de Zoete <rdezoete@7caff3a5-bf03-4202-bd0e-f1a93c86bdae.fritz.box>
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: bojanserafimov <bojan.serafimov7@gmail.com>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
Co-authored-by: Anton Galitsyn <agalitsyn@users.noreply.github.com>
2022-08-18 15:32:33 +03:00
Arthur Petukhovsky
873347f977 Merge pull request #2275 from neondatabase/main
* github/workflows: Fix git dubious ownership (#2223)

* Move relation size cache from WalIngest to DatadirTimeline (#2094)

* Move relation sie cache to layered timeline

* Fix obtaining current LSN for relation size cache

* Resolve merge conflicts

* Resolve merge conflicts

* Reestore 'lsn' field in DatadirModification

* adjust DatadirModification lsn in ingest_record

* Fix formatting

* Pass lsn to get_relsize

* Fix merge conflict

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* refactor: replace lazy-static with once-cell (#2195)

- Replacing all the occurrences of lazy-static with `once-cell::sync::Lazy`
- fixes #1147

Signed-off-by: Ankur Srivastava <best.ankur@gmail.com>

* Add more buckets to pageserver latency metrics (#2225)

* ignore record property warning to fix benchmarks

* increase statement timeout

* use event so it fires only if workload thread successfully finished

* remove debug log

* increase timeout to pass test with real s3

* avoid duplicate parameter, increase timeout

* Major migration script (#2073)

This script can be used to migrate a tenant across breaking storage versions, or (in the future) upgrading postgres versions. See the comment at the top for an overview.

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>

* Fix etcd typos

* Fix links to safekeeper protocol docs. (#2188)

safekeeper/README_PROTO.md was moved to docs/safekeeper-protocol.md in
commit 0b14fdb078, as part of reorganizing the docs into 'mdbook' format.

Fixes issue #1475. Thanks to @banks for spotting the outdated references.

In addition to fixing the above issue, this patch also fixes other broken links as a result of 0b14fdb078. See https://github.com/neondatabase/neon/pull/2188#pullrequestreview-1055918480.

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: Thang Pham <thang@neon.tech>

* Update CONTRIBUTING.md

* Update CONTRIBUTING.md

* support node id and remote storage params in docker_entrypoint.sh

* Safe truncate (#2218)

* Move relation sie cache to layered timeline

* Fix obtaining current LSN for relation size cache

* Resolve merge conflicts

* Resolve merge conflicts

* Reestore 'lsn' field in DatadirModification

* adjust DatadirModification lsn in ingest_record

* Fix formatting

* Pass lsn to get_relsize

* Fix merge conflict

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Update pageserver/src/pgdatadir_mapping.rs

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Check if relation exists before trying to truncat it

refer #1932

* Add test reporducing FSM truncate problem

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>

* Fix exponential backoff values

* Update back `vendor/postgres` back; it was changed accidentally. (#2251)

Commit 4227cfc96e accidentally reverted vendor/postgres to an older
version. Update it back.

* Add pageserver checkpoint_timeout option.

To flush inmemory layer eventually when no new data arrives, which helps
safekeepers to suspend activity (stop pushing to the broker). Default 10m should
be ok.

* Share exponential backoff code and fix logic for delete task failure (#2252)

* Fix bug when import large (>1GB) relations (#2172)

Resolves #2097 

- use timeline modification's `lsn` and timeline's `last_record_lsn` to determine the corresponding LSN to query data in `DatadirModification::get`
- update `test_import_from_pageserver`. Split the test into 2 variants: `small` and `multisegment`. 
  + `small` is the old test
  + `multisegment` is to simulate #2097 by using a larger number of inserted rows to create multiple segment files of a relation. `multisegment` is configured to only run with a `release` build

* Fix timeline physical size flaky tests (#2244)

Resolves #2212.

- use `wait_for_last_flush_lsn` in `test_timeline_physical_size_*` tests

## Context
Need to wait for the pageserver to catch up with the compute's last flush LSN because during the timeline physical size API call, it's possible that there are running `LayerFlushThread` threads. These threads flush new layers into disk and hence update the physical size. This results in a mismatch between the physical size reported by the API and the actual physical size on disk.

### Note
The `LayerFlushThread` threads are processed **concurrently**, so it's possible that the above error still persists even with this patch. However, making the tests wait to finish processing all the WALs (not flushing) before calculating the physical size should help reduce the "flakiness" significantly

* postgres_ffi/waldecoder: validate more header fields

* postgres_ffi/waldecoder: remove unused startlsn

* postgres_ffi/waldecoder: introduce explicit `enum State`

Previously it was emulated with a combination of nullable fields.
This change should make the logic more readable.

* disable `test_import_from_pageserver_multisegment` (#2258)

This test failed consistently on `main` now. It's better to temporarily disable it to avoid blocking others' PRs while investigating the root cause for the test failure.

See: #2255, #2256

* get_binaries uses DOCKER_TAG taken from docker image build step (#2260)

* [proxy] Rework wire format of the password hack and some errors (#2236)

The new format has a few benefits: it's shorter, simpler and
human-readable as well. We don't use base64 anymore, since
url encoding got us covered.

We also show a better error in case we couldn't parse the
payload; the users should know it's all about passing the
correct project name.

* test_runner/pg_clients: collect docker logs (#2259)

* get_binaries script fix (#2263)

* get_binaries uses DOCKER_TAG taken from docker image build step

* remove docker tag discovery at all and fix get_binaries for version variable

* Better storage sync logs (#2268)

* Find end of WAL on safekeepers using WalStreamDecoder.

We could make it inside wal_storage.rs, but taking into account that
 - wal_storage.rs reading is async
 - we don't need s3 here
 - error handling is different; error during decoding is normal
I decided to put it separately.

Test
cargo test test_find_end_of_wal_last_crossing_segment
prepared earlier by @yeputons passes now.

Fixes https://github.com/neondatabase/neon/issues/544
      https://github.com/neondatabase/cloud/issues/2004
Supersedes https://github.com/neondatabase/neon/pull/2066

* Improve walreceiver logic (#2253)

This patch makes walreceiver logic more complicated, but it should work better in most cases. Added `test_wal_lagging` to test scenarios where alive safekeepers can lag behind other alive safekeepers.

- There was a bug which looks like `etcd_info.timeline.commit_lsn > Some(self.local_timeline.get_last_record_lsn())` filtered all safekeepers in some strange cases. I removed this filter, it should probably help with #2237
- Now walreceiver_connection reports status, including commit_lsn. This allows keeping safekeeper connection even when etcd is down.
- Safekeeper connection now fails if pageserver doesn't receive safekeeper messages for some time. Usually safekeeper sends messages at least once per second.
- `LaggingWal` check now uses `commit_lsn` directly from safekeeper. This fixes the issue with often reconnects, when compute generates WAL really fast.
- `NoWalTimeout` is rewritten to trigger only when we know about the new WAL and the connected safekeeper doesn't stream any WAL. This allows setting a small `lagging_wal_timeout` because it will trigger only when we observe that the connected safekeeper has stuck.

* increase timeout in wait_for_upload to avoid spurious failures when testing with real s3

* Bump vendor/postgres to include XLP_FIRST_IS_CONTRECORD fix. (#2274)

* Set up a workflow to run pgbench against captest (#2077)

Signed-off-by: Ankur Srivastava <best.ankur@gmail.com>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Konstantin Knizhnik <knizhnik@garret.ru>
Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
Co-authored-by: Ankur Srivastava <ansrivas@users.noreply.github.com>
Co-authored-by: bojanserafimov <bojan.serafimov7@gmail.com>
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
Co-authored-by: Kirill Bulatov <kirill@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
Co-authored-by: Thang Pham <thang@neon.tech>
Co-authored-by: Stas Kelvich <stas.kelvich@gmail.com>
Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
Co-authored-by: Egor Suvorov <egor@neon.tech>
Co-authored-by: Andrey Taranik <andrey@cicd.team>
Co-authored-by: Dmitry Ivanov <ivadmi5@gmail.com>
2022-08-15 21:30:45 +03:00
Arthur Petukhovsky
e814ac16f9 Merge pull request #2219 from neondatabase/main
Release 2022-08-04
2022-08-04 20:06:34 +03:00
Heikki Linnakangas
ad3055d386 Merge pull request #2203 from neondatabase/release-uuid-ossp
Deploy new storage and compute version to production

Release 2022-08-02
2022-08-02 15:08:14 +03:00
Heikki Linnakangas
94e03eb452 Merge remote-tracking branch 'origin/main' into 'release'
Release 2022-08-01
2022-08-02 12:43:49 +03:00
Sergey Melnikov
380f26ef79 Merge pull request #2170 from neondatabase/main (Release 2022-07-28)
Release 2022-07-28
2022-07-28 14:16:52 +03:00
Arthur Petukhovsky
3c5b7f59d7 Merge pull request #2119 from neondatabase/main
Release 2022-07-19
2022-07-19 11:58:48 +03:00
Arthur Petukhovsky
fee89f80b5 Merge pull request #2115 from neondatabase/main-2022-07-18
Release 2022-07-18
2022-07-18 19:21:11 +03:00
Arthur Petukhovsky
41cce8eaf1 Merge remote-tracking branch 'origin/release' into main-2022-07-18 2022-07-18 18:21:20 +03:00
Alexey Kondratov
f88fe0218d Merge pull request #1842 from neondatabase/release-deploy-hotfix
[HOTFIX] Release deploy fix

This PR uses this branch neondatabase/postgres#171 and several required commits from the main to use only locally built compute-tools. This should allow us to rollout safekeepers sync issue fix on prod
2022-06-01 11:04:30 +03:00
Alexey Kondratov
cc856eca85 Install missing openssl packages in the Github Actions workflow 2022-05-31 21:31:31 +02:00
Alexey Kondratov
cf350c6002 Use :local compute-tools tag to build compute-node image 2022-05-31 21:31:16 +02:00
Arseny Sher
0ce6b6a0a3 Merge pull request #1836 from neondatabase/release-hotfix-basebackup-lsn-page-boundary
Bump vendor/postgres to hotfix basebackup LSN comparison.
2022-05-31 16:54:03 +04:00
Arseny Sher
73f247d537 Bump vendor/postgres to hotfix basebackup LSN comparison. 2022-05-31 16:00:50 +04:00
Andrey Taranik
960be82183 Merge pull request #1792 from neondatabase/main
Release 2202-05-25 (second)
2022-05-25 16:37:57 +03:00
Andrey Taranik
806e5a6c19 Merge pull request #1787 from neondatabase/main
Release 2022-05-25
2022-05-25 13:34:11 +03:00
Alexey Kondratov
8d5df07cce Merge pull request #1385 from zenithdb/main
Release main 2022-03-22
2022-03-22 05:04:34 -05:00
Andrey Taranik
df7a9d1407 release fix 2022-03-16 (#1375) 2022-03-17 00:43:28 +03:00
8 changed files with 296 additions and 496 deletions

View File

@@ -0,0 +1,31 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.build
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack

View File

@@ -825,3 +825,31 @@ jobs:
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
deploy-proxy-new:
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region us-east-2 eks update-kubeconfig --name dev-us-east-2-beta --role-arn arn:aws:iam::369495373322:role/github-runner
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -425,7 +425,28 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
// This is needed since postgres 15, where this privilege is removed by default.
let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string();
let grant_query = "DO $$\n\
BEGIN\n\
IF EXISTS(\n\
SELECT nspname\n\
FROM pg_catalog.pg_namespace\n\
WHERE nspname = 'public'\n\
) AND\n\
version() LIKE 'PostgreSQL 15%'\n\
THEN\n\
IF EXISTS(\n\
SELECT rolname\n\
FROM pg_catalog.pg_roles\n\
WHERE rolname = 'web_access'\n\
)\n\
THEN\n\
GRANT CREATE ON SCHEMA public TO web_access;\n\
END IF;\n\
END IF;\n\
END\n\
$$;"
.to_string();
info!("grant query for db {} : {}", &db.name, &grant_query);
db_client.simple_query(&grant_query)?;
}

View File

@@ -11,7 +11,7 @@
//! parent timeline, and the last LSN that has been written to disk.
//!
use anyhow::{bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context};
use tokio::sync::watch;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -25,7 +25,6 @@ use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::Write;
use std::num::NonZeroU64;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
@@ -292,7 +291,7 @@ impl TimelineUninitMark {
Ok(())
}
fn delete_mark_file_if_present(&mut self) -> Result<(), anyhow::Error> {
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
let uninit_mark_file = &self.uninit_mark_path;
let uninit_mark_parent = uninit_mark_file
.parent()
@@ -470,7 +469,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
) -> anyhow::Result<GcResult> {
let timeline_str = target_timeline_id
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
@@ -486,7 +485,7 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub fn compaction_iteration(&self) -> Result<()> {
pub fn compaction_iteration(&self) -> anyhow::Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -510,7 +509,7 @@ impl Tenant {
///
/// Used at graceful shutdown.
///
pub fn checkpoint(&self) -> Result<()> {
pub fn checkpoint(&self) -> anyhow::Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
@@ -681,7 +680,7 @@ impl Tenant {
/// before the children.
fn tree_sort_timelines(
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> Result<Vec<(TimelineId, TimelineMetadata)>> {
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
let mut result = Vec::with_capacity(timelines.len());
let mut now = Vec::with_capacity(timelines.len());
@@ -784,27 +783,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
}
pub fn get_lagging_wal_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
}
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
@@ -836,7 +814,7 @@ impl Tenant {
))
}
pub fn new(
pub(super) fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
@@ -859,7 +837,7 @@ impl Tenant {
}
/// Locate and load config
pub fn load_tenant_config(
pub(super) fn load_tenant_config(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<TenantConfOpt> {
@@ -901,7 +879,7 @@ impl Tenant {
Ok(tenant_conf)
}
pub fn persist_tenant_config(
pub(super) fn persist_tenant_config(
target_config_path: &Path,
tenant_conf: TenantConfOpt,
first_save: bool,
@@ -994,7 +972,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
) -> anyhow::Result<GcResult> {
let mut totals: GcResult = Default::default();
let now = Instant::now();
@@ -1411,7 +1389,7 @@ fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Path,
pg_version: u32,
) -> Result<()> {
) -> anyhow::Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
info!(
@@ -1457,7 +1435,7 @@ impl Drop for Tenant {
}
}
/// Dump contents of a layer file to stdout.
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
use std::os::unix::fs::FileExt;
// All layer files start with a two-byte "magic" value, to identify the kind of
@@ -1562,13 +1540,13 @@ pub mod harness {
}
impl<'a> TenantHarness<'a> {
pub fn create(test_name: &'static str) -> Result<Self> {
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
Self::create_internal(test_name, false)
}
pub fn create_exclusive(test_name: &'static str) -> Result<Self> {
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
Self::create_internal(test_name, true)
}
fn create_internal(test_name: &'static str, exclusive: bool) -> Result<Self> {
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
let lock_guard = if exclusive {
(None, Some(LOCK.write().unwrap()))
} else {
@@ -1602,7 +1580,7 @@ pub mod harness {
self.try_load().expect("failed to load test tenant")
}
pub fn try_load(&self) -> Result<Tenant> {
pub fn try_load(&self) -> anyhow::Result<Tenant> {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Tenant::new(
@@ -1682,7 +1660,7 @@ pub mod harness {
},
records.len()
);
println!("{}", s);
println!("{s}");
Ok(TEST_IMG(&s))
}
@@ -1706,7 +1684,7 @@ mod tests {
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
#[test]
fn test_basic() -> Result<()> {
fn test_basic() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_basic")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1730,7 +1708,7 @@ mod tests {
}
#[test]
fn no_duplicate_timelines() -> Result<()> {
fn no_duplicate_timelines() -> anyhow::Result<()> {
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
let _ = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1761,7 +1739,7 @@ mod tests {
/// Test branch creation
///
#[test]
fn test_branch() -> Result<()> {
fn test_branch() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_branch")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1814,7 +1792,7 @@ mod tests {
Ok(())
}
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> {
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
@@ -1856,7 +1834,7 @@ mod tests {
}
#[test]
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
@@ -1888,7 +1866,7 @@ mod tests {
}
#[test]
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
@@ -1915,7 +1893,7 @@ mod tests {
// FIXME: This currently fails to error out. Calling GC doesn't currently
// remove the old value, we'd need to work a little harder
#[test]
fn test_prohibit_get_for_garbage_collected_data() -> Result<()> {
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
let repo =
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
@@ -1935,7 +1913,7 @@ mod tests {
*/
#[test]
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant
@@ -1954,7 +1932,7 @@ mod tests {
Ok(())
}
#[test]
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant
@@ -1982,7 +1960,7 @@ mod tests {
}
#[test]
fn timeline_load() -> Result<()> {
fn timeline_load() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
{
@@ -2003,7 +1981,7 @@ mod tests {
}
#[test]
fn timeline_load_with_ancestor() -> Result<()> {
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
// create two timelines
@@ -2042,7 +2020,7 @@ mod tests {
}
#[test]
fn corrupt_metadata() -> Result<()> {
fn corrupt_metadata() -> anyhow::Result<()> {
const TEST_NAME: &str = "corrupt_metadata";
let harness = TenantHarness::create(TEST_NAME)?;
let tenant = harness.load();
@@ -2084,7 +2062,7 @@ mod tests {
}
#[test]
fn test_images() -> Result<()> {
fn test_images() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2136,7 +2114,7 @@ mod tests {
// repeat 50 times.
//
#[test]
fn test_bulk_insert() -> Result<()> {
fn test_bulk_insert() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2178,7 +2156,7 @@ mod tests {
}
#[test]
fn test_random_updates() -> Result<()> {
fn test_random_updates() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2250,7 +2228,7 @@ mod tests {
}
#[test]
fn test_traverse_branches() -> Result<()> {
fn test_traverse_branches() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2331,7 +2309,7 @@ mod tests {
}
#[test]
fn test_traverse_ancestors() -> Result<()> {
fn test_traverse_ancestors() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?

View File

@@ -1,6 +1,6 @@
//!
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context};
use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
@@ -307,10 +307,6 @@ pub struct GcInfo {
/// Public interface functions
impl Timeline {
//------------------------------------------------------------------------------
// Public GET functions
//------------------------------------------------------------------------------
/// Get the LSN where this branch was created
pub fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
@@ -445,7 +441,7 @@ impl Timeline {
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
) -> Result<()> {
) -> anyhow::Result<()> {
ensure!(
lsn >= **latest_gc_cutoff_lsn,
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
@@ -455,12 +451,6 @@ impl Timeline {
Ok(())
}
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
@@ -479,6 +469,91 @@ impl Timeline {
}
}
pub fn compact(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
}
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
// chunk boundaries with the existing image layers to avoid
// too much churn. Also try to align chunk boundaries with
// relation boundaries. In principle, we don't know about
// relation boundaries here, we just deal with key-value
// pairs, and the code in pgdatadir_mapping.rs knows how to
// map relations into key-value pairs. But in practice we know
// that 'field6' is the block number, and the fields 1-5
// identify a relation. This is just an optimization,
// though.
//
// 2. Once we know the partitioning, for each partition,
// decide if it's time to create a new image layer. The
// criteria is: there has been too much "churn" since the last
// image layer? The "churn" is fuzzy concept, it's a
// combination of too many delta files, or too much WAL in
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
match self.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
) {
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
layer_paths_to_upload,
None,
);
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
Ok(())
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
@@ -486,6 +561,80 @@ impl Timeline {
_write_guard: self.write_lock.lock().unwrap(),
}
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(init_lsn);
}
Ok(size)
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
}
}
// Private functions
@@ -529,7 +678,7 @@ impl Timeline {
///
/// Loads the metadata for the timeline into memory, but not the layer map.
#[allow(clippy::too_many_arguments)]
pub fn new(
pub(super) fn new(
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
metadata: TimelineMetadata,
@@ -602,7 +751,7 @@ impl Timeline {
result
}
pub fn launch_wal_receiver(self: &Arc<Self>) {
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
if !is_etcd_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because etcd client hasn't been initialized");
@@ -641,7 +790,7 @@ impl Timeline {
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
pub fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
let mut num_layers = 0;
@@ -727,30 +876,12 @@ impl Timeline {
Ok(())
}
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
pub(super) fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
self.layer_removal_cs
.try_lock()
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(init_lsn);
}
Ok(size)
}
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
let timeline_id = self.timeline_id;
@@ -971,7 +1102,7 @@ impl Timeline {
Some((lsn, img))
}
fn get_ancestor_timeline(&self) -> Result<Arc<Timeline>> {
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
@@ -1030,14 +1161,14 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
layer.put_value(key, lsn, val)?;
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
@@ -1076,64 +1207,6 @@ impl Timeline {
drop(layers);
}
///
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
///
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
}
/// Flush all frozen layers to disk.
///
/// Only one task at a time can be doing layer-flushing for a
@@ -1141,7 +1214,7 @@ impl Timeline {
/// currently doing the flushing, this function will wait for it
/// to finish. If 'wait' is false, this function will return
/// immediately instead.
fn flush_frozen_layers(&self, wait: bool) -> Result<()> {
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
let flush_lock_guard = if wait {
self.layer_flush_lock.lock().unwrap()
} else {
@@ -1180,7 +1253,7 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -1238,7 +1311,7 @@ impl Timeline {
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
) -> Result<()> {
) -> anyhow::Result<()> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
@@ -1299,7 +1372,7 @@ impl Timeline {
fn create_delta_layer(
&self,
frozen_layer: &InMemoryLayer,
) -> Result<(PathBuf, LayerFileMetadata)> {
) -> anyhow::Result<(PathBuf, LayerFileMetadata)> {
// Write it out
let new_delta = frozen_layer.write_to_disk()?;
let new_delta_path = new_delta.path();
@@ -1334,92 +1407,7 @@ impl Timeline {
Ok((new_delta_path, LayerFileMetadata::new(sz)))
}
pub fn compact(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
}
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
// chunk boundaries with the existing image layers to avoid
// too much churn. Also try to align chunk boundaries with
// relation boundaries. In principle, we don't know about
// relation boundaries here, we just deal with key-value
// pairs, and the code in pgdatadir_mapping.rs knows how to
// map relations into key-value pairs. But in practice we know
// that 'field6' is the block number, and the fields 1-5
// identify a relation. This is just an optimization,
// though.
//
// 2. Once we know the partitioning, for each partition,
// decide if it's time to create a new image layer. The
// criteria is: there has been too much "churn" since the last
// image layer? The "churn" is fuzzy concept, it's a
// combination of too many delta files, or too much WAL in
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
match self.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
) {
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
layer_paths_to_upload,
None,
);
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
Ok(())
}
fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> {
let mut partitioning_guard = self.partitioning.lock().unwrap();
if partitioning_guard.1 == Lsn(0)
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
@@ -1433,7 +1421,7 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<bool> {
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
let layers = self.layers.read().unwrap();
for part_range in &partition.ranges {
@@ -1478,7 +1466,7 @@ impl Timeline {
partitioning: &KeyPartitioning,
lsn: Lsn,
force: bool,
) -> Result<HashMap<PathBuf, LayerFileMetadata>> {
) -> anyhow::Result<HashMap<PathBuf, LayerFileMetadata>> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers: Vec<ImageLayer> = Vec::new();
for partition in partitioning.parts.iter() {
@@ -1571,7 +1559,7 @@ impl Timeline {
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
///
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
let layers = self.layers.read().unwrap();
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
@@ -1881,12 +1869,12 @@ impl Timeline {
///
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
/// whether a record is needed for PITR.
pub fn update_gc_info(
pub(super) fn update_gc_info(
&self,
retain_lsns: Vec<Lsn>,
cutoff_horizon: Lsn,
pitr: Duration,
) -> Result<()> {
) -> anyhow::Result<()> {
let mut gc_info = self.gc_info.write().unwrap();
gc_info.horizon_cutoff = cutoff_horizon;
@@ -1941,7 +1929,7 @@ impl Timeline {
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
pub fn gc(&self) -> Result<GcResult> {
pub(super) fn gc(&self) -> anyhow::Result<GcResult> {
let mut result: GcResult = Default::default();
let now = SystemTime::now();
@@ -2261,11 +2249,11 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value)
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn)
}

View File

@@ -10,9 +10,6 @@ EXTENSION = neon_test_utils
DATA = neon_test_utils--1.0.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

View File

@@ -23,11 +23,6 @@ RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0)
RETURNS void
AS 'MODULE_PATHNAME', 'neon_seqscan_rel'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'

View File

@@ -23,13 +23,8 @@
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"
#include "../neon/pagestore_client.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
#include "libpq/libpq.h"
PG_MODULE_MAGIC;
extern void _PG_init(void);
@@ -39,7 +34,6 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
PG_FUNCTION_INFO_V1(neon_xlogflush);
PG_FUNCTION_INFO_V1(neon_seqscan_rel);
/*
* Linkage to functions in neon module.
@@ -295,238 +289,6 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
}
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(PGconn *conn, char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
elog(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
}
goto retry;
}
return ret;
}
static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn);
/*
* Fetch all pages of given relation. This simulates a sequential scan
* over the table. You can specify the number of blocks to prefetch;
* the function will try to keep that many requests "in flight" at all
* times. The fetched pages are simply discarded.
*/
Datum
neon_seqscan_rel(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Oid nprefetch = PG_GETARG_INT32(1);
Relation rel;
char *raw_page_data;
BlockNumber nblocks;
PGconn *pageserver_conn;
XLogRecPtr read_lsn;
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to use raw page functions")));
rel = relation_open(relid, AccessShareLock);
nblocks = RelationGetNumberOfBlocks(rel);
pageserver_conn = PQconnectdb(page_server_connstring);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg("could not establish connection to pageserver"),
errdetail_internal("%s", msg)));
}
PG_TRY();
{
char *query;
int ret;
StringInfoData resp_buff;
read_lsn = GetXLogInsertRecPtr();
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
elog(ERROR, "could not send pagestream command to pageserver");
}
while (PQisBusy(pageserver_conn))
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
elog(ERROR, "could not complete handshake with pageserver: %s",
msg);
}
}
}
elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch);
BlockNumber nsent = 0;
for (BlockNumber blkno = 0; blkno < nblocks; blkno++)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = true,
.req.lsn = read_lsn,
.rnode = rel->rd_node,
.forknum = MAIN_FORKNUM,
.blkno = blkno
};
NeonResponse *resp;
if (blkno % 1024 == 0)
elog(INFO, "blk %u/%u", blkno, nblocks);
if (nsent < blkno + nprefetch + 1 && nsent < nblocks)
{
while (nsent < blkno + nprefetch + 1 && nsent < nblocks)
send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn);
if (PQflush(pageserver_conn))
{
char *msg = PQerrorMessage(pageserver_conn);
elog(ERROR, "failed to flush page requests: %s", msg);
}
}
/* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len < 0)
{
if (resp_buff.len == -1)
elog(ERROR, "end of COPY");
else if (resp_buff.len == -2)
elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
}
resp = nm_unpack_response(&resp_buff);
switch (resp->tag)
{
case T_NeonGetPageResponse:
/* ok */
break;
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read block %u", blkno),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
PQfreemem(resp_buff.data);
}
}
PG_CATCH();
{
PQfinish(pageserver_conn);
PG_RE_THROW();
}
PG_END_TRY();
relation_close(rel, AccessShareLock);
}
static void
send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = true,
.req.lsn = lsn,
.rnode = rnode,
.forknum = MAIN_FORKNUM,
.blkno = blkno
};
StringInfoData req_buff;
req_buff = nm_pack_request(&request.req);
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = PQerrorMessage(pageserver_conn);
elog(ERROR, "failed to send page request: %s", msg);
}
pfree(req_buff.data);
}
/*
* Directly calls XLogFlush(lsn) to flush WAL buffers.
*/