Compare commits

..

29 Commits

Author SHA1 Message Date
Arseny Sher
51f672b0bb Rename write_message to write_message_noflush in postgres_backend_async.rs
To make it unifrom across the project; proxy stream.rs and older
postgres_backend uses write_message_noflush.
2023-03-01 20:05:56 +04:00
sharnoff
1360361f60 Fix missing VM cgconfig.conf (#3718)
It was being added to the wrong stage in the dockerfile. This should fix
it, and resolves an ongoing issue on staging.
2023-02-28 21:11:00 -08:00
Alexander Bayandin
000eb1b069 Bump tempfile from 3.3.0 to 3.4.0 (#3709)
Update `tempfile` crate to get rid of `remove_dir_all` dependency
Ref https://github.com/neondatabase/neon/security/dependabot/15
2023-02-27 12:44:08 +00:00
Heikki Linnakangas
f51b48fa49 Fix UNLOGGED tables.
Instead of trying to create missing files on the way, send init fork contents as
main fork from pageserver during basebackup. Add test for that. Call
put_rel_drop for init forks; previously they weren't removed. Bump
vendor/postgres to revert previous approach on Postgres side.

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>

ref https://github.com/neondatabase/postgres/pull/264
ref https://github.com/neondatabase/postgres/pull/259
ref https://github.com/neondatabase/neon/issues/1222
2023-02-24 23:30:02 +04:00
Sergey Melnikov
9f906ff236 Add pageserver-2.us-east-2.aws.neon.tech (#3701) 2023-02-23 19:56:21 +01:00
Sam Kleinman
c79dd8d458 compute_ctl: support for fetching spec from control plane (#3610) 2023-02-23 13:19:39 -05:00
Vadim Kharitonov
ec4ecdd543 Enable postgres SPI extensions 2023-02-23 16:49:37 +01:00
MMeent
20a4d817ce Update vendored PostgreSQL versions to 14.7 and 15.2 (#3581)
## Describe your changes
Rebase vendored PostgreSQL onto 14.7 and 15.2

## Issue ticket number and link

#3579

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [x] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [x] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
    ```
The version of PostgreSQL that we use is updated to 14.7 for PostgreSQL
14 and 15.2 for PostgreSQL 15.
    ```
2023-02-23 16:10:22 +02:00
Vadim Kharitonov
5ebf7e5619 Fix pg_jsonschema and pg_graphql 2023-02-23 10:43:46 +01:00
Arseny Sher
0692fffbf3 Bump vendor/postgres to include hotfix for unlogged tables with indexes.
https://github.com/neondatabase/postgres/pull/259
https://github.com/neondatabase/postgres/pull/262
2023-02-23 01:34:59 +04:00
Vadim Kharitonov
093570af20 Compile pg_hashids extension 2023-02-22 21:00:25 +01:00
Dmitry Rodionov
eb403da814 Use debug level for successful GET http requests (#3681)
We started rather frequently scrap some apis for metadata. This includes
layer eviction tester, I believe console does that too.

It should eliminate these logs:
https://neonprod.grafana.net/goto/rr_ace1Vz?orgId=1 (Note the rate
around 2k messages per minute)
2023-02-22 22:19:05 +03:00
Vadim Kharitonov
f3ad635911 Compile pgrouting extension 2023-02-22 20:16:11 +01:00
Vadim Kharitonov
a8d7360881 Compile hypopg extension 2023-02-22 20:14:30 +01:00
Lassi Pölönen
b0311cfdeb Change the production neon-proxy-scram update strategy to RollingUpdate (#3683)
## Describe your changes
The same change in production as was done in staging by
https://github.com/neondatabase/neon/pull/3678

## Issue ticket number and link
https://github.com/neondatabase/neon/issues/3333
2023-02-22 20:15:37 +02:00
Konstantin Knizhnik
412e0aa985 Skip largest N holes during compaction (#3597)
## Describe your changes

This is yet another attempt to address problem with storage size
ballooning #2948
Previous PR #3348 tries to address this problem by maintaining list of
holes for each layer.
The problem with this approach is that we have to load all layer on
pageserver start.
Lazy loading of layers is not possible any more.

This PR tries to collect information of N largest holes on compaction
time and exclude this holes from produced layers.
It can cause generation of larger number of layers (up to 2 times) and
producing small layers.
But it requires minimal changes in code and doesn't affect storage
format.

For graphical explanation please see thread:
https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451

## Issue ticket number and link

#2948
#3348

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
2023-02-22 18:28:01 +02:00
Lassi Pölönen
965b4f4ae2 Change the staging neon-proxy-scram update strategy to RollingUpdate (#3678)
## Describe your changes
When we deploy the proxy with the default Recreate strategy, there's
always some downtime and existing connections will be shut down. Change
the strategy to RollingUpdate and delay the kill signal by one week. AWS
Network Loadbalancer keeps the existing connections alive for as long as
the pods are alive, but will direct new connections to new pods.


## Issue ticket number and link
https://github.com/neondatabase/neon/issues/3333
2023-02-22 16:50:07 +02:00
Arthur Petukhovsky
95018672fa Remove safekeeper-1.ap-southeast-1.aws.neon.tech (#3671)
We migrated all timelines to
`safekeeper-3.ap-southeast-1.aws.neon.tech`, now old instance can be
removed.
2023-02-22 11:55:41 +02:00
Sergey Melnikov
2caece2077 Add -v to ansible invocations (#3670)
To get more debug output on failures
2023-02-21 23:11:52 +03:00
Joonas Koivunen
b8b8c19fb4 fix: hold permit until GetObject eof (#3663)
previously we applied the ratelimiting only up to receiving the headers
from s3, or somewhere near it. the commit adds an adapter which carries
the permit until the AsyncRead has been disposed.

fixes #3662.
2023-02-21 21:14:08 +02:00
Joonas Koivunen
225add041f calculate_logical_size: no longer use spawn_blocking (#3664)
Calculation of logical size is now async because of layer downloads, so
we shouldn't use spawn_blocking for it. Use of `spawn_blocking`
exhausted resources which are needed by `tokio::io::copy` when copying
from a stream to a file which lead to deadlock.

Fixes: #3657
2023-02-21 21:09:31 +02:00
Joonas Koivunen
5d001b1e5a chore: ignore all compaction inactive tenant errors (#3665)
these are happening in tests because of #3655 but they sure took some
time to appear.

makes the `Compaction failed, retrying in 2s: Cannot run compaction
iteration on inactive tenant` into a globally allowed error, because it
has been seen failing on different test cases.
2023-02-21 20:20:13 +02:00
Joonas Koivunen
fe462de85b fix: log download failed error (#3661)
Fixes #3659
2023-02-21 19:31:53 +02:00
Vadim Kharitonov
c0de7f5cd8 Build pg_jsonschema and pg_graphql extensions (#3535)
## Describe your changes
Layer for building pg extensions written on Rust

It required forking:
* `cargo-pgx` (in order not to catch an ABI mismatch error (`cargo-pgx`
hardcoded ABI tcdi/pgx#1032)
* `pg_jsonschema` (to use forked `cargo-pgx` version)
* `pgx-contrib-spiext` (to use forked `cargo-pgx`)
* `pg_graphql` (to use forked `cargo-pgx` and `pgx-contrib-spiext`
version)

Before the patch:

```
postgres=# create extension pg_jsonschema;
2023-02-02 17:45:23.120 UTC [35] ERROR:  incompatible library "/usr/local/lib/pg_jsonschema.so": ABI mismatch
2023-02-02 17:45:23.120 UTC [35] DETAIL:  Server has ABI "Neon Postgres", library has "PostgreSQL".
2023-02-02 17:45:23.120 UTC [35] STATEMENT:  create extension pg_jsonschema;
ERROR:  incompatible library "/usr/local/lib/pg_jsonschema.so": ABI mismatch
DETAIL:  Server has ABI "Neon Postgres", library has "PostgreSQL".
```

After 

```
postgres=# create extension pg_jsonschema;
CREATE EXTENSION
postgres=# select json_matches_schema('{"type": "object"}', '{}');
 json_matches_schema
---------------------
 t
postgres=# create extension pg_graphql;
CREATE EXTENSION
postgres=# create table book(id int primary key, title text);
CREATE TABLE
postgres=# insert into book(id, title) values (1, 'book 1');
INSERT 0 1
postgres=# select graphql.resolve($$
query {
  bookCollection {
    edges {
      node {
        id
      }
    }
  }
}
$$);
                            resolve
----------------------------------------------------------------
 {"data": {"bookCollection": {"edges": [{"node": {"id": 1}}]}}}
(1 row)
```

## Issue ticket number and link
Closes #3429, #3096

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [x] If it is a core feature, I have added thorough tests.
- [x] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [x] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

`pg_jsonschema` extension will be available for our customers
2023-02-21 17:31:23 +01:00
Joonas Koivunen
b220ba6cd1 add random init delay for background tasks (#3655)
Fixes #3649.
2023-02-21 12:42:11 +01:00
Joonas Koivunen
7de373210d Warn when background tasks exceed their configured period (#3654)
Fixes #3648.
2023-02-21 13:02:19 +02:00
Vadim Kharitonov
5c5b03ce08 Compile xml2 extension 2023-02-21 10:34:45 +01:00
Joonas Koivunen
d7d3f451f0 Use tracing panic hook in all binaries (#3634)
Enables tracing panic hook in addition to pageserver introduced in
#3475:

- proxy
- safekeeper
- storage_broker

For proxy, a drop guard which resets the original std panic hook was
added on the first commit. Other binaries don't need it so they never
reset anything by `disarm`ing the drop guard.

The aim of the change is to make sure all panics a) have span
information b) are logged similar to other messages, not interleaved
with other messages as happens right now. Interleaving happens right now
because std prints panics to stderr, and other logging happens in
stdout. If this was handled gracefully by some utility, the log message
splitter would treat panics as belonging to the previous message because
it expects a message to start with a timestamp.

Cc: #3468
2023-02-21 10:03:55 +02:00
Keanu Ashwell
bc7d3c6476 docs: add dependency requirements for arch based systems (#3588)
This pull request adds information on building neon on Arch based system
such as Artix, Manjaro, Antergos, etc.
2023-02-20 22:51:54 +03:00
39 changed files with 892 additions and 292 deletions

View File

@@ -32,8 +32,6 @@ storage:
hosts:
safekeeper-0.ap-southeast-1.aws.neon.tech:
ansible_host: i-0d6f1dc5161eef894
safekeeper-1.ap-southeast-1.aws.neon.tech:
ansible_host: i-0e338adda8eb2d19f
safekeeper-2.ap-southeast-1.aws.neon.tech:
ansible_host: i-04fb63634e4679eb9
safekeeper-3.ap-southeast-1.aws.neon.tech:

View File

@@ -27,6 +27,8 @@ storage:
ansible_host: i-062227ba7f119eb8c
pageserver-1.us-east-2.aws.neon.tech:
ansible_host: i-0b3ec0afab5968938
pageserver-2.us-east-2.aws.neon.tech:
ansible_host: i-0d7a1c4325e71421d
safekeepers:
hosts:

View File

@@ -1,6 +1,21 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,21 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -67,7 +67,7 @@ jobs:
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder

View File

@@ -68,7 +68,7 @@ jobs:
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
deploy-proxy-prod-new:

20
Cargo.lock generated
View File

@@ -854,6 +854,7 @@ dependencies = [
"opentelemetry",
"postgres",
"regex",
"reqwest",
"serde",
"serde_json",
"tar",
@@ -3054,6 +3055,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"pin-project-lite",
"serde",
"serde_json",
"tempfile",
@@ -3065,15 +3067,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]]
name = "reqwest"
version = "0.11.14"
@@ -3847,16 +3840,15 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.3.0"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95"
dependencies = [
"cfg-if",
"fastrand",
"libc",
"redox_syscall",
"remove_dir_all",
"winapi",
"rustix",
"windows-sys 0.42.0",
]
[[package]]

View File

@@ -150,7 +150,7 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
criterion = "0.4"
rcgen = "0.10"
rstest = "0.16"
tempfile = "3.2"
tempfile = "3.4"
tonic-build = "0.8"
# This is only needed for proxy's tests.

View File

@@ -1,3 +1,4 @@
ARG PG_VERSION
ARG REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
ARG IMAGE=rust
ARG TAG=pinned
@@ -11,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev
libicu-dev libxslt1-dev
#########################################################################################
#
@@ -23,18 +24,24 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION} postgres
RUN cd postgres && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
--with-libxml --with-libxslt && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
# Enable some of contrib extensions
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
#########################################################################################
#
@@ -57,10 +64,11 @@ RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
make clean && cp -R /sfcgal/* /
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postgis.tar.gz && \
mkdir postgis-src && cd postgis-src && tar xvzf ../postgis.tar.gz --strip-components=1 -C . && \
./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
cd extensions/postgis && \
@@ -74,6 +82,15 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postg
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer_data_us.control
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
mkdir build && \
cd build && \
cmake .. && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control
#########################################################################################
#
# Layer "plv8-build"
@@ -178,6 +195,96 @@ RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b214
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
#########################################################################################
#
# Layer "hypopg-pg-build"
# compile hypopg extension
#
#########################################################################################
FROM build-deps AS hypopg-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypopg.tar.gz && \
mkdir hypopg-src && cd hypopg-src && tar xvzf ../hypopg.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/hypopg.control
#########################################################################################
#
# Layer "pg-hashids-pg-build"
# compile pg_hashids extension
#
#########################################################################################
FROM build-deps AS pg-hashids-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_hashids.control
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgx` deps
#
#########################################################################################
FROM build-deps AS rust-extensions-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt-get update && \
apt-get install -y curl libclang-dev cmake && \
useradd -ms /bin/bash nonroot -b /home
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
ARG PG_VERSION
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
cargo install --git https://github.com/vadim2404/pgx --branch neon_abi_v0.6.1 --locked cargo-pgx && \
/bin/bash -c 'cargo pgx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "pg-jsonschema-pg-build"
# Compile "pg_jsonschema" extension
#
#########################################################################################
FROM rust-extensions-build AS pg-jsonschema-pg-build
RUN git clone --depth=1 --single-branch --branch neon_abi_v0.1.4 https://github.com/vadim2404/pg_jsonschema/ && \
cd pg_jsonschema && \
cargo pgx install --release && \
# it's needed to enable extension because it uses untrusted C language
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_jsonschema.control && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
#########################################################################################
#
# Layer "pg-graphql-pg-build"
# Compile "pg_graphql" extension
#
#########################################################################################
FROM rust-extensions-build AS pg-graphql-pg-build
RUN git clone --depth=1 --single-branch --branch neon_abi_v1.1.0 https://github.com/vadim2404/pg_graphql && \
cd pg_graphql && \
cargo pgx install --release && \
# it's needed to enable extension because it uses untrusted C language
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
@@ -193,6 +300,10 @@ COPY --from=h3-pg-build /h3/usr /
COPY --from=unit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=vector-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgjwt-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-jsonschema-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-graphql-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -255,6 +366,7 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# libicu67, locales for collations (including ICU)
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
RUN apt update && \
apt install --no-install-recommends -y \
locales \
@@ -266,6 +378,8 @@ RUN apt update && \
libproj19 \
libprotobuf-c1 \
libsfcgal1 \
libxml2 \
libxslt1.1 \
gdb && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -10,7 +10,6 @@ RUN set -e \
&& rm -f /etc/inittab \
&& touch /etc/inittab
ADD vm-cgconfig.conf /etc/cgconfig.conf
RUN set -e \
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant --auto-restart --cgroup=neon-postgres'" >> /etc/inittab
@@ -26,6 +25,7 @@ RUN apt update && \
RUN adduser vm-informant --disabled-password --no-create-home
USER postgres
ADD vm-cgconfig.conf /etc/cgconfig.conf
COPY --from=informant /etc/inittab /etc/inittab
COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant

View File

@@ -34,6 +34,11 @@ dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake postgresql postgresql-contrib protobuf-compiler \
protobuf-devel
```
* On Arch based systems, these packages are needed:
```bash
pacman -S base-devel readline zlib libseccomp openssl clang \
postgresql-libs cmake postgresql protobuf
```
2. [Install Rust](https://www.rust-lang.org/tools/install)
```

View File

@@ -17,6 +17,7 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
tar.workspace = true
reqwest = { workspace = true, features = ["json"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tracing.workspace = true

View File

@@ -65,6 +65,9 @@ fn main() -> Result<()> {
let spec = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let compute_id = matches.get_one::<String>("compute-id");
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
@@ -77,8 +80,27 @@ fn main() -> Result<()> {
let path = Path::new(sp);
let file = File::open(path)?;
serde_json::from_reader(file)?
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?
} else {
panic!(
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
control_plane_uri, compute_id
);
}
} else {
panic!("cluster spec should be provided via --spec or --spec-path argument");
panic!("compute spec should be provided via --spec or --spec-path argument");
}
}
};
@@ -227,6 +249,18 @@ fn cli() -> clap::Command {
.long("spec-path")
.value_name("SPEC_PATH"),
)
.arg(
Arg::new("compute-id")
.short('i')
.long("compute-id")
.value_name("COMPUTE_ID"),
)
.arg(
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE"),
)
}
#[test]

View File

@@ -1,41 +1,5 @@
# Synthetic size
## How to get the data
Pageserver provides a HTTP API for getting the synthetic size of a tenant
along with the data that was used to calculate it. Usage examples:
1. This query returns the synthetic size of the tenant, along with the "raw" data
That is returned in the `segments` and `timeline_inputs` fields.
```
curl localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size | jq
```
2. If `inputs_only=true` is passed, the response will contain only the raw data.
Actual synthetic size is not calculated.
```
curl localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size?inputs_only=true | jq
```
3. 'retention_period' is a cutoff (in bytes) that overrides the cutoff that is used in the size calculation.
Note, that override is applied only if provided `retnention_period` is shorter than the real cutoff.
```
curl localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size?retention_period=1048576 | jq
```
4. If header `Accept: text/html` is passed, the response will be in HTML format.
The HTML contains a json with the same data as in the previous examples + SVG diagram of the tenant timelines.
```
curl -H "Accept: text/html" localhost:9898/v1/tenant/5e1de642394b00a0a583a088e8276b98/synthetic_size > ./size.html | google-chrome ./size.html
```
## Overview
Neon storage has copy-on-write branching, which makes it difficult to
answer the question "how large is my database"? To give one reasonable
answer, we calculate _synthetic size_ for a project.

View File

@@ -98,6 +98,15 @@ impl RelTag {
name
}
pub fn with_forknum(&self, forknum: u8) -> Self {
RelTag {
forknum,
spcnode: self.spcnode,
dbnode: self.dbnode,
relnode: self.relnode,
}
}
}
///

View File

@@ -21,7 +21,7 @@ toml_edit.workspace = true
tracing.workspace = true
metrics.workspace = true
utils.workspace = true
pin-project-lite.workspace = true
workspace_hack.workspace = true
[dev-dependencies]

View File

@@ -20,7 +20,10 @@ use aws_sdk_s3::{
};
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use tokio::{io, sync::Semaphore};
use tokio::{
io::{self, AsyncRead},
sync::Semaphore,
};
use tokio_util::io::ReaderStream;
use tracing::debug;
@@ -102,7 +105,7 @@ pub struct S3Bucket {
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
concurrency_limiter: Semaphore,
concurrency_limiter: Arc<Semaphore>,
}
#[derive(Default)]
@@ -162,7 +165,7 @@ impl S3Bucket {
client,
bucket_name: aws_config.bucket_name.clone(),
prefix_in_bucket,
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
})
}
@@ -194,9 +197,10 @@ impl S3Bucket {
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
let _guard = self
let permit = self
.concurrency_limiter
.acquire()
.clone()
.acquire_owned()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;
@@ -217,9 +221,10 @@ impl S3Bucket {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
Ok(Download {
metadata,
download_stream: Box::pin(io::BufReader::new(
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
permit,
object_output.body.into_async_read(),
)),
))),
})
}
Err(SdkError::ServiceError {
@@ -240,6 +245,32 @@ impl S3Bucket {
}
}
pin_project_lite::pin_project! {
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
struct RatelimitedAsyncRead<S> {
permit: tokio::sync::OwnedSemaphorePermit,
#[pin]
inner: S,
}
}
impl<S: AsyncRead> RatelimitedAsyncRead<S> {
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
RatelimitedAsyncRead { permit, inner }
}
}
impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
this.inner.poll_read(cx, buf)
}
}
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {

View File

@@ -4,13 +4,14 @@ use anyhow::{anyhow, Context};
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
use hyper::{Method, StatusCode};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use tokio::task::JoinError;
use tracing::info;
use tracing;
use std::future::Future;
use std::net::TcpListener;
@@ -27,7 +28,14 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
});
async fn logger(res: Response<Body>, info: RequestInfo) -> Result<Response<Body>, ApiError> {
info!("{} {} {}", info.method(), info.uri().path(), res.status(),);
// cannot factor out the Level to avoid the repetition
// because tracing can only work with const Level
// which is not the case here
if info.method() == Method::GET && res.status() == StatusCode::OK {
tracing::debug!("{} {} {}", info.method(), info.uri().path(), res.status());
} else {
tracing::info!("{} {} {}", info.method(), info.uri().path(), res.status());
}
Ok(res)
}
@@ -203,7 +211,7 @@ pub fn serve_thread_main<S>(
where
S: Future<Output = ()> + Send + Sync,
{
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
tracing::info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
// Create a Service from the router above to handle incoming requests.
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();

View File

@@ -45,3 +45,115 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
Ok(())
}
/// Disable the default rust panic hook by using `set_hook`.
///
/// For neon binaries, the assumption is that tracing is configured before with [`init`], after
/// that sentry is configured (if needed). sentry will install it's own on top of this, always
/// processing the panic before we log it.
///
/// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
/// If the assumptions about the initialization order are not held, use
/// [`TracingPanicHookGuard::disarm`] but keep in mind, if tracing is stopped, then panics will be
/// lost.
#[must_use]
pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {
std::panic::set_hook(Box::new(tracing_panic_hook));
TracingPanicHookGuard::new()
}
/// Drop guard which restores the std panic hook on drop.
///
/// Tracing should not be used when it's not configured, but we cannot really latch on to any
/// imaginary lifetime of tracing.
pub struct TracingPanicHookGuard {
act: bool,
}
impl TracingPanicHookGuard {
fn new() -> Self {
TracingPanicHookGuard { act: true }
}
/// Make this hook guard not do anything when dropped.
pub fn forget(&mut self) {
self.act = false;
}
}
impl Drop for TracingPanicHookGuard {
fn drop(&mut self) {
if self.act {
let _ = std::panic::take_hook();
}
}
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();
let msg = match info.payload().downcast_ref::<&'static str>() {
Some(s) => *s,
None => match info.payload().downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
let thread = std::thread::current();
let thread = thread.name().unwrap_or("<unnamed>");
let backtrace = std::backtrace::Backtrace::capture();
let _entered = if let Some(location) = location {
tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
} else {
// very unlikely to hit here, but the guarantees of std could change
tracing::error_span!("panic", %thread)
}
.entered();
if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
// this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
// get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
// string, maybe even to a TLS one but tracing already does that.
tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
} else {
tracing::error!("{msg}");
}
// ensure that we log something on the panic if this hook is left after tracing has been
// unconfigured. worst case when teardown is racing the panic is to log the panic twice.
tracing::dispatcher::get_default(|d| {
if let Some(_none) = d.downcast_ref::<tracing::subscriber::NoSubscriber>() {
let location = location.map(PrettyLocation);
log_panic_to_stderr(thread, msg, location, &backtrace);
}
});
}
#[cold]
fn log_panic_to_stderr(
thread: &str,
msg: &str,
location: Option<PrettyLocation<'_, '_>>,
backtrace: &std::backtrace::Backtrace,
) {
eprintln!("panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}");
}
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
impl std::fmt::Display for PrettyLocation<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
}
}
impl std::fmt::Debug for PrettyLocation<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<Self as std::fmt::Display>::fmt(self, f)
}
}

View File

@@ -233,7 +233,7 @@ impl PostgresBackend {
}
/// Write message into internal output buffer.
pub fn write_message(&mut self, message: &BeMessage<'_>) -> io::Result<&mut Self> {
pub fn write_message_noflush(&mut self, message: &BeMessage<'_>) -> io::Result<&mut Self> {
BeMessage::write(&mut self.buf_out, message)?;
Ok(self)
}
@@ -383,7 +383,7 @@ impl PostgresBackend {
FeStartupPacket::SslRequest => {
debug!("SSL requested");
self.write_message(&BeMessage::EncryptionResponse(have_tls))?;
self.write_message_noflush(&BeMessage::EncryptionResponse(have_tls))?;
if have_tls {
self.start_tls().await?;
self.state = ProtoState::Encrypted;
@@ -391,11 +391,11 @@ impl PostgresBackend {
}
FeStartupPacket::GssEncRequest => {
debug!("GSS requested");
self.write_message(&BeMessage::EncryptionResponse(false))?;
self.write_message_noflush(&BeMessage::EncryptionResponse(false))?;
}
FeStartupPacket::StartupMessage { .. } => {
if have_tls && !matches!(self.state, ProtoState::Encrypted) {
self.write_message(&BeMessage::ErrorResponse(
self.write_message_noflush(&BeMessage::ErrorResponse(
"must connect with TLS",
None,
))?;
@@ -410,15 +410,17 @@ impl PostgresBackend {
match self.auth_type {
AuthType::Trust => {
self.write_message(&BeMessage::AuthenticationOk)?
.write_message(&BeMessage::CLIENT_ENCODING)?
self.write_message_noflush(&BeMessage::AuthenticationOk)?
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
// The async python driver requires a valid server_version
.write_message(&BeMessage::server_version("14.1"))?
.write_message(&BeMessage::ReadyForQuery)?;
.write_message_noflush(&BeMessage::server_version("14.1"))?
.write_message_noflush(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}
AuthType::NeonJWT => {
self.write_message(&BeMessage::AuthenticationCleartextPassword)?;
self.write_message_noflush(
&BeMessage::AuthenticationCleartextPassword,
)?;
self.state = ProtoState::Authentication;
}
}
@@ -441,7 +443,7 @@ impl PostgresBackend {
let (_, jwt_response) = m.split_last().context("protocol violation")?;
if let Err(e) = handler.check_auth_jwt(self, jwt_response) {
self.write_message(&BeMessage::ErrorResponse(
self.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?;
@@ -449,9 +451,9 @@ impl PostgresBackend {
}
}
}
self.write_message(&BeMessage::AuthenticationOk)?
.write_message(&BeMessage::CLIENT_ENCODING)?
.write_message(&BeMessage::ReadyForQuery)?;
self.write_message_noflush(&BeMessage::AuthenticationOk)?
.write_message_noflush(&BeMessage::CLIENT_ENCODING)?
.write_message_noflush(&BeMessage::ReadyForQuery)?;
self.state = ProtoState::Established;
}
@@ -486,30 +488,30 @@ impl PostgresBackend {
if let Err(e) = handler.process_query(self, query_string).await {
log_query_error(query_string, &e);
let short_error = short_error(&e);
self.write_message(&BeMessage::ErrorResponse(
self.write_message_noflush(&BeMessage::ErrorResponse(
&short_error,
Some(e.pg_error_code()),
))?;
}
self.write_message(&BeMessage::ReadyForQuery)?;
self.write_message_noflush(&BeMessage::ReadyForQuery)?;
}
FeMessage::Parse(m) => {
*unnamed_query_string = m.query_string;
self.write_message(&BeMessage::ParseComplete)?;
self.write_message_noflush(&BeMessage::ParseComplete)?;
}
FeMessage::Describe(_) => {
self.write_message(&BeMessage::ParameterDescription)?
.write_message(&BeMessage::NoData)?;
self.write_message_noflush(&BeMessage::ParameterDescription)?
.write_message_noflush(&BeMessage::NoData)?;
}
FeMessage::Bind(_) => {
self.write_message(&BeMessage::BindComplete)?;
self.write_message_noflush(&BeMessage::BindComplete)?;
}
FeMessage::Close(_) => {
self.write_message(&BeMessage::CloseComplete)?;
self.write_message_noflush(&BeMessage::CloseComplete)?;
}
FeMessage::Execute(_) => {
@@ -517,7 +519,7 @@ impl PostgresBackend {
trace!("got execute {query_string:?}");
if let Err(e) = handler.process_query(self, query_string).await {
log_query_error(query_string, &e);
self.write_message(&BeMessage::ErrorResponse(
self.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?;
@@ -529,7 +531,7 @@ impl PostgresBackend {
}
FeMessage::Sync => {
self.write_message(&BeMessage::ReadyForQuery)?;
self.write_message_noflush(&BeMessage::ReadyForQuery)?;
}
FeMessage::Terminate => {
@@ -579,7 +581,7 @@ impl<'a> AsyncWrite for CopyDataWriter<'a> {
// XXX: if the input is large, we should split it into multiple messages.
// Not sure what the threshold should be, but the ultimate hard limit is that
// the length cannot exceed u32.
this.pgb.write_message(&BeMessage::CopyData(buf))?;
this.pgb.write_message_noflush(&BeMessage::CopyData(buf))?;
Poll::Ready(Ok(buf.len()))
}

View File

@@ -33,6 +33,7 @@ use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
use postgres_ffi::TransactionId;
use postgres_ffi::XLogFileName;
use postgres_ffi::PG_TLI;
@@ -190,14 +191,31 @@ where
{
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
// Gather and send relational files in each database if full backup is requested.
if self.full_backup {
for rel in self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?
{
self.add_rel(rel).await?;
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
continue;
}
if self.full_backup {
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
continue;
}
self.add_rel(rel, rel).await?;
}
}
}
@@ -220,15 +238,16 @@ where
Ok(())
}
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
/// Add contents of relfilenode `src`, naming it as `dst`.
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
let nblocks = self
.timeline
.get_rel_size(tag, self.lsn, false, self.ctx)
.get_rel_size(src, self.lsn, false, self.ctx)
.await?;
// If the relation is empty, create an empty file
if nblocks == 0 {
let file_name = tag.to_segfile_name(0);
let file_name = dst.to_segfile_name(0);
let header = new_tar_header(&file_name, 0)?;
self.ar.append(&header, &mut io::empty()).await?;
return Ok(());
@@ -244,12 +263,12 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false, self.ctx)
.get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
.await?;
segment_data.extend_from_slice(&img[..]);
}
let file_name = tag.to_segfile_name(seg as u32);
let file_name = dst.to_segfile_name(seg as u32);
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
self.ar.append(&header, segment_data.as_slice()).await?;

View File

@@ -91,9 +91,9 @@ fn main() -> anyhow::Result<()> {
// Initialize logging, which must be initialized before the custom panic hook is installed.
logging::init(conf.log_format)?;
// disable the default rust panic hook by using `set_hook`. sentry will install it's own on top
// of this, always processing the panic before we log it.
std::panic::set_hook(Box::new(tracing_panic_hook));
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(
@@ -499,50 +499,6 @@ fn cli() -> Command {
)
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();
let msg = match info.payload().downcast_ref::<&'static str>() {
Some(s) => *s,
None => match info.payload().downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
let thread = std::thread::current();
let thread = thread.name().unwrap_or("<unnamed>");
let backtrace = std::backtrace::Backtrace::capture();
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
impl std::fmt::Display for PrettyLocation<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
}
}
let _entered = if let Some(location) = location {
tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
} else {
// very unlikely to hit here, but the guarantees of std could change
tracing::error_span!("panic", %thread)
}
.entered();
if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
// this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
// get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
// string, maybe even to a TLS one but tracing already does that.
tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
} else {
tracing::error!("{msg}");
}
}
#[test]
fn verify_cli() {
cli().debug_assert();

View File

@@ -64,7 +64,7 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Byt
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = format!("pageserver is shutting down");
let _ = pgb.write_message(&BeMessage::ErrorResponse(&msg, None));
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
@@ -80,13 +80,13 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Byt
FeMessage::Terminate => {
let msg = "client terminated connection with Terminate message during COPY";
let query_error_error = QueryError::Disconnected(ConnectionError::Socket(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
pgb.write_message(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
break;
}
m => {
let msg = format!("unexpected message {m:?}");
pgb.write_message(&BeMessage::ErrorResponse(&msg, None))?;
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None))?;
Err(io::Error::new(io::ErrorKind::Other, msg))?;
break;
}
@@ -97,7 +97,7 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Byt
Ok(None) => {
let msg = "client closed connection during COPY";
let query_error_error = QueryError::Disconnected(ConnectionError::Socket(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
pgb.write_message(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?;
pgb.flush().await?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
@@ -311,7 +311,7 @@ impl PageServerHandler {
let timeline = tenant.get_timeline(timeline_id, true)?;
// switch client to COPYBOTH
pgb.write_message(&BeMessage::CopyBothResponse)?;
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
pgb.flush().await?;
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
@@ -380,7 +380,7 @@ impl PageServerHandler {
})
});
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
Ok(())
@@ -416,7 +416,7 @@ impl PageServerHandler {
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let mut copyin_stream = Box::pin(copyin_stream(pgb));
@@ -468,7 +468,7 @@ impl PageServerHandler {
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message(&BeMessage::CopyInResponse)?;
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let mut copyin_stream = Box::pin(copyin_stream(pgb));
let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream);
@@ -678,7 +678,7 @@ impl PageServerHandler {
}
// switch client to COPYOUT
pgb.write_message(&BeMessage::CopyOutResponse)?;
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
pgb.flush().await?;
// Send a tarball of the latest layer on the timeline
@@ -695,7 +695,7 @@ impl PageServerHandler {
.await?;
}
pgb.write_message(&BeMessage::CopyDone)?;
pgb.write_message_noflush(&BeMessage::CopyDone)?;
pgb.flush().await?;
info!("basebackup complete");
@@ -812,7 +812,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// return pair of prev_lsn and last_lsn
else if query_string.starts_with("get_last_record_rlsn ") {
@@ -835,15 +835,15 @@ impl postgres_backend_async::Handler for PageServerHandler {
let end_of_timeline = timeline.get_last_record_rlsn();
pgb.write_message(&BeMessage::RowDescription(&[
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::text_col(b"prev_lsn"),
RowDescriptor::text_col(b"last_lsn"),
]))?
.write_message(&BeMessage::DataRow(&[
.write_message_noflush(&BeMessage::DataRow(&[
Some(end_of_timeline.prev.to_string().as_bytes()),
Some(end_of_timeline.last.to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// same as basebackup, but result includes relational data as well
else if query_string.starts_with("fullbackup ") {
@@ -884,7 +884,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true, ctx)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {
// Import the `base` section (everything but the wal) of a basebackup.
// Assumes the tenant already exists on this pageserver.
@@ -929,10 +929,10 @@ impl postgres_backend_async::Handler for PageServerHandler {
)
.await
{
Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?,
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => {
error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
pgb.write_message(&BeMessage::ErrorResponse(
pgb.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?
@@ -965,10 +965,10 @@ impl postgres_backend_async::Handler for PageServerHandler {
.handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
.await
{
Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?,
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => {
error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
pgb.write_message(&BeMessage::ErrorResponse(
pgb.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?
@@ -977,7 +977,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
@@ -993,7 +993,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
pgb.write_message(&BeMessage::RowDescription(&[
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
RowDescriptor::int8_col(b"compaction_target_size"),
@@ -1004,7 +1004,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
RowDescriptor::int8_col(b"image_creation_threshold"),
RowDescriptor::int8_col(b"pitr_interval"),
]))?
.write_message(&BeMessage::DataRow(&[
.write_message_noflush(&BeMessage::DataRow(&[
Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
Some(
tenant
@@ -1027,7 +1027,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else {
return Err(QueryError::Other(anyhow::anyhow!(
"unknown command {query_string}"

View File

@@ -3,7 +3,7 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
@@ -11,6 +11,7 @@ use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantId;
@@ -53,37 +54,55 @@ async fn compaction_loop(tenant_id: TenantId) {
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
};
let mut sleep_duration = tenant.get_compaction_period();
if sleep_duration == Duration::ZERO {
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&ctx).await {
sleep_duration = wait_duration;
error!("Compaction failed, retrying in {:?}: {e:?}", sleep_duration);
let period = tenant.get_compaction_period();
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let started_at = Instant::now();
let sleep_duration = if period == Duration::ZERO {
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&ctx).await {
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {
period
}
};
warn_when_period_overrun(started_at.elapsed(), period, "compaction");
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -105,14 +124,16 @@ async fn gc_loop(tenant_id: TenantId) {
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
@@ -122,27 +143,38 @@ async fn gc_loop(tenant_id: TenantId) {
},
};
let gc_period = tenant.get_gc_period();
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
if sleep_duration == Duration::ZERO {
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
// Run gc
if gc_horizon > 0 {
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);
}
let period = tenant.get_gc_period();
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let started_at = Instant::now();
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
// Run gc
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
if let Err(e) = res {
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {
period
}
};
warn_when_period_overrun(started_at.elapsed(), period, "gc");
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -197,3 +229,49 @@ async fn wait_for_active_tenant(
}
}
}
#[derive(thiserror::Error, Debug)]
#[error("cancelled")]
pub(crate) struct Cancelled;
/// Provide a random delay for background task initialization.
///
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
/// different periods for more stable load.
pub(crate) async fn random_init_delay(
period: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
use rand::Rng;
let d = {
let mut rng = rand::thread_rng();
// gen_range asserts that the range cannot be empty, which it could be because period can
// be set to zero to disable gc or compaction, so lets set it to be at least 10s.
let period = std::cmp::max(period, Duration::from_secs(10));
// semi-ok default as the source of jitter
rng.gen_range(Duration::ZERO..=period)
};
tokio::select! {
_ = cancel.cancelled() => Err(Cancelled),
_ = tokio::time::sleep(d) => Ok(()),
}
}
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
if elapsed >= period && period != Duration::ZERO {
// humantime does no significant digits clamping whereas Duration's debug is a bit more
// intelligent. however it makes sense to keep the "configuration format" for period, even
// though there's no way to output the actual config value.
warn!(
?elapsed,
period = %humantime::format_duration(period),
task,
"task iteration took longer than the configured period"
);
}
}

View File

@@ -19,6 +19,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::fs;
use std::ops::{Deref, Range};
@@ -82,6 +83,25 @@ enum FlushLoopState {
Exited,
}
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
other.coverage_size.cmp(&self.coverage_size) // inverse order
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -1770,15 +1790,9 @@ impl Timeline {
let calculation = async {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
tokio::task::spawn_blocking(move || {
// Run in a separate thread since this can do a lot of
// synchronous file IO without .await inbetween
// if there are no RemoteLayers that would require downloading.
let h = tokio::runtime::Handle::current();
h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel, &ctx))
})
.await
.context("Failed to spawn calculation result task")?
self_calculation
.calculate_logical_size(init_lsn, cancel, &ctx)
.await
};
let timeline_state_cancellation = async {
loop {
@@ -1811,7 +1825,7 @@ impl Timeline {
tokio::pin!(calculation);
loop {
tokio::select! {
res = &mut calculation => { return res }
res = &mut calculation => { return res }
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
@@ -2947,6 +2961,47 @@ impl Timeline {
},
)?;
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? {
if let Some(prev_key) = prev {
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
}
prev = Some(next_key.next());
}
drop(layers);
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
@@ -3041,14 +3096,22 @@ impl Timeline {
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow...
let contains_hole =
next_hole < holes.len() && key >= holes[next_hole].key_range.end;
// check if key cause layer overflow or contains hole...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
writer = None;
if contains_hole {
// skip hole
next_hole += 1;
}
}
}
// Remember size of key value because at next iteration we will access next item
@@ -3745,6 +3808,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
info!("on-demand download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download

View File

@@ -41,9 +41,23 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
use crate::tenant::tasks::random_init_delay;
{
let policy = self.get_eviction_policy();
let period = match policy {
EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &cancel).await.is_err() {
info!("shutting down");
return;
}
}
loop {
let policy = self.get_eviction_policy();
let cf = self.eviction_iteration(&policy, cancel.clone()).await;
match cf {
ControlFlow::Break(()) => break,
ControlFlow::Continue(sleep_until) => {
@@ -78,13 +92,7 @@ impl Timeline {
ControlFlow::Continue(()) => (),
}
let elapsed = start.elapsed();
if elapsed > p.period {
warn!(
configured_period = %humantime::format_duration(p.period),
last_period = %humantime::format_duration(elapsed),
"this eviction period took longer than the configured period"
);
}
crate::tenant::tasks::warn_when_period_overrun(elapsed, p.period, "eviction");
ControlFlow::Continue(start + p.period)
}
}

View File

@@ -37,7 +37,7 @@ use crate::walrecord::*;
use crate::ZERO_PAGE;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
@@ -762,7 +762,7 @@ impl<'a> WalIngest<'a> {
)?;
for xnode in &parsed.xnodes {
for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
let rel = RelTag {
forknum,
spcnode: xnode.spcnode,

View File

@@ -43,6 +43,7 @@ async fn flatten_err(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("Version: {GIT_VERSION}");

View File

@@ -126,7 +126,12 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
// important to keep the order of:
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
info!("version: {GIT_VERSION}");
let args_workdir = &args.datadir;

View File

@@ -424,12 +424,16 @@ async fn http1_handler(
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
let args = Args::parse();
// important to keep the order of:
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("version: {GIT_VERSION}");
::metrics::set_build_info_metric(GIT_VERSION);

View File

@@ -1669,7 +1669,7 @@ class AbstractNeonCli(abc.ABC):
timeout=timeout,
)
if not res.returncode:
log.info(f"Run success: {res.stdout}")
log.info(f"Run {res.args} success: {res.stdout}")
elif check_return_code:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
@@ -2079,6 +2079,9 @@ class NeonPageserver(PgProtocol):
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
]
def start(
@@ -3460,6 +3463,14 @@ def wait_for_last_flush_lsn(
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def wait_for_wal_insert_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def fork_at_current_lsn(
env: NeonEnv,
pg: Postgres,

View File

@@ -3,8 +3,15 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.types import Lsn
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
Postgres,
wait_for_last_flush_lsn,
wait_for_wal_insert_lsn,
)
from fixtures.types import Lsn, TenantId, TimelineId
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
@@ -324,7 +331,7 @@ def test_single_branch_get_tenant_size_grows(
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
# obviously lead to issues when calculating the size.
gc_horizon = 0x30000
gc_horizon = 0x38000
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
env = neon_env_builder.init_start()
@@ -334,29 +341,75 @@ def test_single_branch_get_tenant_size_grows(
http_client = env.pageserver.http_client()
collected_responses: List[Tuple[Lsn, int]] = []
collected_responses: List[Tuple[str, Lsn, int]] = []
size_debug_file = open(test_output_dir / "size_debug.html", "w")
def check_size_change(current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev: int):
if current_lsn - initdb_lsn > gc_horizon:
def check_size_change(
current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev_size: int
):
if current_lsn - initdb_lsn >= gc_horizon:
assert (
size >= prev
size >= prev_size
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
else:
assert (
size > prev
size > prev_size
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
def get_current_consistent_size(
env: NeonEnv,
pg: Postgres,
size_debug_file, # apparently there is no public signature for open()...
http_client: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Tuple[Lsn, int]:
consistent = False
size_debug = None
current_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
# We want to make sure we have a self-consistent set of values.
# Size changes with WAL, so only if both before and after getting
# the size of the tenant reports the same WAL insert LSN, we're OK
# to use that (size, LSN) combination.
# Note that 'wait_for_wal_flush_lsn' is not accurate enough: There
# can be more wal after the flush LSN that can arrive on the
# pageserver before we're requesting the page size.
# Anyway, in general this is only one iteration, so in general
# this is fine.
while not consistent:
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
size_debug = http_client.tenant_size_debug(tenant_id)
after_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
consistent = current_lsn == after_lsn
current_lsn = after_lsn
size_debug_file.write(size_debug)
return (current_lsn, size)
with env.postgres.create_start(
branch_name,
tenant_id=tenant_id,
### autovacuum is disabled to limit WAL logging.
config_lines=["autovacuum=off"],
) as pg:
(initdb_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
collected_responses.append(("INITDB", initdb_lsn, size))
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)")
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL) WITH (fillfactor = 40)")
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
collected_responses.append(("CREATE", current_lsn, size))
batch_size = 100
i = 0
while True:
for i in range(3):
with pg.cursor() as cur:
cur.execute(
f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)",
@@ -365,27 +418,24 @@ def test_single_branch_get_tenant_size_grows(
i += 1
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
prev_size = collected_responses[-1][2]
if size == 0:
assert prev_size == 0
else:
# branch start shouldn't be past gc_horizon yet
# thus the size should grow as we insert more data
# "gc_horizon" is tuned so that it kicks in _after_ the
# insert phase, but before the update phase ends.
assert (
current_lsn - initdb_lsn <= gc_horizon
), "Tuning of GC window is likely out-of-date"
assert size > prev_size
size_debug = http_client.tenant_size_debug(tenant_id)
size_debug_file.write(size_debug)
if len(collected_responses) > 0:
prev = collected_responses[-1][1]
if size == 0:
assert prev == 0
else:
# branch start shouldn't be past gc_horizon yet
# thus the size should grow as we insert more data
assert current_lsn - initdb_lsn <= gc_horizon
assert size > prev
collected_responses.append((current_lsn, size))
if len(collected_responses) > 2:
break
collected_responses.append(("INSERT", current_lsn, size))
while True:
with pg.cursor() as cur:
@@ -397,18 +447,15 @@ def test_single_branch_get_tenant_size_grows(
if updated == 0:
break
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
prev_size = collected_responses[-1][2]
size_debug = http_client.tenant_size_debug(tenant_id)
size_debug_file.write(size_debug)
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
prev = collected_responses[-1][1]
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
collected_responses.append((current_lsn, size))
collected_responses.append(("UPDATE", current_lsn, size))
while True:
with pg.cursor() as cur:
@@ -418,40 +465,47 @@ def test_single_branch_get_tenant_size_grows(
if deleted == 0:
break
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
size = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
prev_size = collected_responses[-1][2]
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
collected_responses.append((current_lsn, size))
collected_responses.append(("DELETE", current_lsn, size))
with pg.cursor() as cur:
cur.execute("DROP TABLE t0")
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
# The size of the tenant should still be as large as before we dropped
# the table, because the drop operation can still be undone in the PITR
# defined by gc_horizon.
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
size = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
prev_size = collected_responses[-1][2]
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
collected_responses.append((current_lsn, size))
collected_responses.append(("DROP", current_lsn, size))
# this isn't too many lines to forget for a while. observed while
# developing these tests that locally the value is a bit more than what we
# get in the ci.
for lsn, size in collected_responses:
log.info(f"collected: {lsn}, {size}")
for phase, lsn, size in collected_responses:
log.info(f"collected: {phase}, {lsn}, {size}")
env.pageserver.stop()
env.pageserver.start()
size_after = http_client.tenant_size(tenant_id)
size_debug = http_client.tenant_size_debug(tenant_id)
size_debug_file.write(size_debug)
size_debug_file.close()
size_after = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
prev = collected_responses[-1][2]
assert size_after == prev, "size after restarting pageserver should not have changed"

View File

@@ -0,0 +1,34 @@
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
#
# Test UNLOGGED tables/relations. Postgres copies init fork contents to main
# fork to reset them during recovery. In Neon, pageserver directly sends init
# fork contents as main fork during basebackup.
#
def test_unlogged(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_unlogged", "empty")
pg = env.postgres.create_start("test_unlogged")
conn = pg.connect()
cur = conn.cursor()
cur.execute("CREATE UNLOGGED TABLE iut (id int);")
# create index to test unlogged index relation as well
cur.execute("CREATE UNIQUE INDEX iut_idx ON iut (id);")
cur.execute("INSERT INTO iut values (42);")
# create another compute to fetch inital empty contents from pageserver
fork_at_current_lsn(env, pg, "test_unlogged_basebackup", "test_unlogged")
pg2 = env.postgres.create_start(
"test_unlogged_basebackup",
)
conn2 = pg2.connect()
cur2 = conn2.cursor()
# after restart table should be empty but valid
cur2.execute("PREPARE iut_plan (int) AS INSERT INTO iut VALUES ($1)")
cur2.execute("EXECUTE iut_plan (43);")
cur2.execute("SELECT * FROM iut")
assert cur2.fetchall() == [(43,)]