Compare commits

...

19 Commits

Author SHA1 Message Date
Alexey Kondratov
1e3a6cc813 [pageserver] Do several adjustements to the find_lsn_for_timestamp
1. Use `max(find_lsn_for_timestamp, ancestor_lsn)` as a lower boundary
   for search. We use this method to figure out the branching LSN for
   new branch, but GC cutoff could be before branching point and we
   cannot create new branch with LSN < `ancestor_lsn`.

2. Search for the first commit **before** specified timestamp. This
   solves two drawbacks: i) newly created branch won't contain physical
   changes from later commits that will be marked as aborted, and will
   need to be vacuumed away; and ii) we can still figure out a
   reasonable branching LSN if there were no new commits since the
   specified timestamp.

3. Change `get_lsn_by_timestamp` API method to return LSN even if we
   only found commit **before** the specified timestamp.

Resolves #3414
2023-02-22 20:33:05 +01:00
Dmitry Rodionov
eb403da814 Use debug level for successful GET http requests (#3681)
We started rather frequently scrap some apis for metadata. This includes
layer eviction tester, I believe console does that too.

It should eliminate these logs:
https://neonprod.grafana.net/goto/rr_ace1Vz?orgId=1 (Note the rate
around 2k messages per minute)
2023-02-22 22:19:05 +03:00
Vadim Kharitonov
f3ad635911 Compile pgrouting extension 2023-02-22 20:16:11 +01:00
Vadim Kharitonov
a8d7360881 Compile hypopg extension 2023-02-22 20:14:30 +01:00
Lassi Pölönen
b0311cfdeb Change the production neon-proxy-scram update strategy to RollingUpdate (#3683)
## Describe your changes
The same change in production as was done in staging by
https://github.com/neondatabase/neon/pull/3678

## Issue ticket number and link
https://github.com/neondatabase/neon/issues/3333
2023-02-22 20:15:37 +02:00
Konstantin Knizhnik
412e0aa985 Skip largest N holes during compaction (#3597)
## Describe your changes

This is yet another attempt to address problem with storage size
ballooning #2948
Previous PR #3348 tries to address this problem by maintaining list of
holes for each layer.
The problem with this approach is that we have to load all layer on
pageserver start.
Lazy loading of layers is not possible any more.

This PR tries to collect information of N largest holes on compaction
time and exclude this holes from produced layers.
It can cause generation of larger number of layers (up to 2 times) and
producing small layers.
But it requires minimal changes in code and doesn't affect storage
format.

For graphical explanation please see thread:
https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451

## Issue ticket number and link

#2948
#3348

## Checklist before requesting a review
- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.
2023-02-22 18:28:01 +02:00
Lassi Pölönen
965b4f4ae2 Change the staging neon-proxy-scram update strategy to RollingUpdate (#3678)
## Describe your changes
When we deploy the proxy with the default Recreate strategy, there's
always some downtime and existing connections will be shut down. Change
the strategy to RollingUpdate and delay the kill signal by one week. AWS
Network Loadbalancer keeps the existing connections alive for as long as
the pods are alive, but will direct new connections to new pods.


## Issue ticket number and link
https://github.com/neondatabase/neon/issues/3333
2023-02-22 16:50:07 +02:00
Arthur Petukhovsky
95018672fa Remove safekeeper-1.ap-southeast-1.aws.neon.tech (#3671)
We migrated all timelines to
`safekeeper-3.ap-southeast-1.aws.neon.tech`, now old instance can be
removed.
2023-02-22 11:55:41 +02:00
Sergey Melnikov
2caece2077 Add -v to ansible invocations (#3670)
To get more debug output on failures
2023-02-21 23:11:52 +03:00
Joonas Koivunen
b8b8c19fb4 fix: hold permit until GetObject eof (#3663)
previously we applied the ratelimiting only up to receiving the headers
from s3, or somewhere near it. the commit adds an adapter which carries
the permit until the AsyncRead has been disposed.

fixes #3662.
2023-02-21 21:14:08 +02:00
Joonas Koivunen
225add041f calculate_logical_size: no longer use spawn_blocking (#3664)
Calculation of logical size is now async because of layer downloads, so
we shouldn't use spawn_blocking for it. Use of `spawn_blocking`
exhausted resources which are needed by `tokio::io::copy` when copying
from a stream to a file which lead to deadlock.

Fixes: #3657
2023-02-21 21:09:31 +02:00
Joonas Koivunen
5d001b1e5a chore: ignore all compaction inactive tenant errors (#3665)
these are happening in tests because of #3655 but they sure took some
time to appear.

makes the `Compaction failed, retrying in 2s: Cannot run compaction
iteration on inactive tenant` into a globally allowed error, because it
has been seen failing on different test cases.
2023-02-21 20:20:13 +02:00
Joonas Koivunen
fe462de85b fix: log download failed error (#3661)
Fixes #3659
2023-02-21 19:31:53 +02:00
Vadim Kharitonov
c0de7f5cd8 Build pg_jsonschema and pg_graphql extensions (#3535)
## Describe your changes
Layer for building pg extensions written on Rust

It required forking:
* `cargo-pgx` (in order not to catch an ABI mismatch error (`cargo-pgx`
hardcoded ABI tcdi/pgx#1032)
* `pg_jsonschema` (to use forked `cargo-pgx` version)
* `pgx-contrib-spiext` (to use forked `cargo-pgx`)
* `pg_graphql` (to use forked `cargo-pgx` and `pgx-contrib-spiext`
version)

Before the patch:

```
postgres=# create extension pg_jsonschema;
2023-02-02 17:45:23.120 UTC [35] ERROR:  incompatible library "/usr/local/lib/pg_jsonschema.so": ABI mismatch
2023-02-02 17:45:23.120 UTC [35] DETAIL:  Server has ABI "Neon Postgres", library has "PostgreSQL".
2023-02-02 17:45:23.120 UTC [35] STATEMENT:  create extension pg_jsonschema;
ERROR:  incompatible library "/usr/local/lib/pg_jsonschema.so": ABI mismatch
DETAIL:  Server has ABI "Neon Postgres", library has "PostgreSQL".
```

After 

```
postgres=# create extension pg_jsonschema;
CREATE EXTENSION
postgres=# select json_matches_schema('{"type": "object"}', '{}');
 json_matches_schema
---------------------
 t
postgres=# create extension pg_graphql;
CREATE EXTENSION
postgres=# create table book(id int primary key, title text);
CREATE TABLE
postgres=# insert into book(id, title) values (1, 'book 1');
INSERT 0 1
postgres=# select graphql.resolve($$
query {
  bookCollection {
    edges {
      node {
        id
      }
    }
  }
}
$$);
                            resolve
----------------------------------------------------------------
 {"data": {"bookCollection": {"edges": [{"node": {"id": 1}}]}}}
(1 row)
```

## Issue ticket number and link
Closes #3429, #3096

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [x] If it is a core feature, I have added thorough tests.
- [x] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [x] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

`pg_jsonschema` extension will be available for our customers
2023-02-21 17:31:23 +01:00
Joonas Koivunen
b220ba6cd1 add random init delay for background tasks (#3655)
Fixes #3649.
2023-02-21 12:42:11 +01:00
Joonas Koivunen
7de373210d Warn when background tasks exceed their configured period (#3654)
Fixes #3648.
2023-02-21 13:02:19 +02:00
Vadim Kharitonov
5c5b03ce08 Compile xml2 extension 2023-02-21 10:34:45 +01:00
Joonas Koivunen
d7d3f451f0 Use tracing panic hook in all binaries (#3634)
Enables tracing panic hook in addition to pageserver introduced in
#3475:

- proxy
- safekeeper
- storage_broker

For proxy, a drop guard which resets the original std panic hook was
added on the first commit. Other binaries don't need it so they never
reset anything by `disarm`ing the drop guard.

The aim of the change is to make sure all panics a) have span
information b) are logged similar to other messages, not interleaved
with other messages as happens right now. Interleaving happens right now
because std prints panics to stderr, and other logging happens in
stdout. If this was handled gracefully by some utility, the log message
splitter would treat panics as belonging to the previous message because
it expects a message to start with a timestamp.

Cc: #3468
2023-02-21 10:03:55 +02:00
Keanu Ashwell
bc7d3c6476 docs: add dependency requirements for arch based systems (#3588)
This pull request adds information on building neon on Arch based system
such as Artix, Manjaro, Antergos, etc.
2023-02-20 22:51:54 +03:00
27 changed files with 619 additions and 145 deletions

View File

@@ -32,8 +32,6 @@ storage:
hosts:
safekeeper-0.ap-southeast-1.aws.neon.tech:
ansible_host: i-0d6f1dc5161eef894
safekeeper-1.ap-southeast-1.aws.neon.tech:
ansible_host: i-0e338adda8eb2d19f
safekeeper-2.ap-southeast-1.aws.neon.tech:
ansible_host: i-04fb63634e4679eb9
safekeeper-3.ap-southeast-1.aws.neon.tech:

View File

@@ -1,6 +1,21 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,21 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -1,6 +1,22 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
deploymentStrategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 100%
maxUnavailable: 50%
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
# The pod(s) will stay in Terminating, keeps the existing connections
# but doesn't receive new ones
containerLifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 604800"]
terminationGracePeriodSeconds: 604800
image:
repository: neondatabase/neon

View File

@@ -67,7 +67,7 @@ jobs:
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
- name: Cleanup ansible folder

View File

@@ -68,7 +68,7 @@ jobs:
./get_binaries.sh
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
rm -f neon_install.tar.gz .neon_current_version
deploy-proxy-prod-new:

1
Cargo.lock generated
View File

@@ -3054,6 +3054,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"pin-project-lite",
"serde",
"serde_json",
"tempfile",

View File

@@ -1,3 +1,4 @@
ARG PG_VERSION
ARG REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
ARG IMAGE=rust
ARG TAG=pinned
@@ -11,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev
libicu-dev libxslt1-dev
#########################################################################################
#
@@ -23,7 +24,8 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION} postgres
RUN cd postgres && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu && \
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
--with-libxml --with-libxslt && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
@@ -34,7 +36,8 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
#########################################################################################
#
@@ -57,10 +60,11 @@ RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
make clean && cp -R /sfcgal/* /
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postgis.tar.gz && \
mkdir postgis-src && cd postgis-src && tar xvzf ../postgis.tar.gz --strip-components=1 -C . && \
./autogen.sh && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
cd extensions/postgis && \
@@ -74,6 +78,15 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postg
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer_data_us.control
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
mkdir build && \
cd build && \
cmake .. && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control
#########################################################################################
#
# Layer "plv8-build"
@@ -178,6 +191,77 @@ RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b214
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
#########################################################################################
#
# Layer "hypopg-pg-build"
# compile hypopg extension
#
#########################################################################################
FROM build-deps AS hypopg-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypopg.tar.gz && \
mkdir hypopg-src && cd hypopg-src && tar xvzf ../hypopg.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/hypopg.control
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgx` deps
#
#########################################################################################
FROM build-deps AS rust-extensions-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt-get update && \
apt-get install -y curl libclang-dev cmake && \
useradd -ms /bin/bash nonroot -b /home
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
ARG PG_VERSION
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
cargo install --git https://github.com/vadim2404/pgx --branch neon_abi_v0.6.1 --locked cargo-pgx && \
/bin/bash -c 'cargo pgx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "pg-jsonschema-pg-build"
# Compile "pg_jsonschema" extension
#
#########################################################################################
FROM rust-extensions-build AS pg-jsonschema-pg-build
RUN git clone --depth=1 --single-branch --branch neon_abi_v0.1.4 https://github.com/vadim2404/pg_jsonschema/ && \
cd pg_jsonschema && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
#########################################################################################
#
# Layer "pg-graphql-pg-build"
# Compile "pg_graphql" extension
#
#########################################################################################
FROM rust-extensions-build AS pg-graphql-pg-build
RUN git clone --depth=1 --single-branch --branch neon_abi_v1.1.0 https://github.com/vadim2404/pg_graphql && \
cd pg_graphql && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
@@ -193,6 +277,9 @@ COPY --from=h3-pg-build /h3/usr /
COPY --from=unit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=vector-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgjwt-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-jsonschema-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-graphql-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -255,6 +342,7 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
# libicu67, locales for collations (including ICU)
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
RUN apt update && \
apt install --no-install-recommends -y \
locales \
@@ -266,6 +354,8 @@ RUN apt update && \
libproj19 \
libprotobuf-c1 \
libsfcgal1 \
libxml2 \
libxslt1.1 \
gdb && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -34,6 +34,11 @@ dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake postgresql postgresql-contrib protobuf-compiler \
protobuf-devel
```
* On Arch based systems, these packages are needed:
```bash
pacman -S base-devel readline zlib libseccomp openssl clang \
postgresql-libs cmake postgresql protobuf
```
2. [Install Rust](https://www.rust-lang.org/tools/install)
```

View File

@@ -21,7 +21,7 @@ toml_edit.workspace = true
tracing.workspace = true
metrics.workspace = true
utils.workspace = true
pin-project-lite.workspace = true
workspace_hack.workspace = true
[dev-dependencies]

View File

@@ -20,7 +20,10 @@ use aws_sdk_s3::{
};
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use tokio::{io, sync::Semaphore};
use tokio::{
io::{self, AsyncRead},
sync::Semaphore,
};
use tokio_util::io::ReaderStream;
use tracing::debug;
@@ -102,7 +105,7 @@ pub struct S3Bucket {
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
concurrency_limiter: Semaphore,
concurrency_limiter: Arc<Semaphore>,
}
#[derive(Default)]
@@ -162,7 +165,7 @@ impl S3Bucket {
client,
bucket_name: aws_config.bucket_name.clone(),
prefix_in_bucket,
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
})
}
@@ -194,9 +197,10 @@ impl S3Bucket {
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
let _guard = self
let permit = self
.concurrency_limiter
.acquire()
.clone()
.acquire_owned()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;
@@ -217,9 +221,10 @@ impl S3Bucket {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
Ok(Download {
metadata,
download_stream: Box::pin(io::BufReader::new(
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
permit,
object_output.body.into_async_read(),
)),
))),
})
}
Err(SdkError::ServiceError {
@@ -240,6 +245,32 @@ impl S3Bucket {
}
}
pin_project_lite::pin_project! {
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
struct RatelimitedAsyncRead<S> {
permit: tokio::sync::OwnedSemaphorePermit,
#[pin]
inner: S,
}
}
impl<S: AsyncRead> RatelimitedAsyncRead<S> {
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
RatelimitedAsyncRead { permit, inner }
}
}
impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
this.inner.poll_read(cx, buf)
}
}
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {

View File

@@ -4,13 +4,14 @@ use anyhow::{anyhow, Context};
use hyper::header::{HeaderName, AUTHORIZATION};
use hyper::http::HeaderValue;
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
use hyper::{Method, StatusCode};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use tokio::task::JoinError;
use tracing::info;
use tracing;
use std::future::Future;
use std::net::TcpListener;
@@ -27,7 +28,14 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
});
async fn logger(res: Response<Body>, info: RequestInfo) -> Result<Response<Body>, ApiError> {
info!("{} {} {}", info.method(), info.uri().path(), res.status(),);
// cannot factor out the Level to avoid the repetition
// because tracing can only work with const Level
// which is not the case here
if info.method() == Method::GET && res.status() == StatusCode::OK {
tracing::debug!("{} {} {}", info.method(), info.uri().path(), res.status());
} else {
tracing::info!("{} {} {}", info.method(), info.uri().path(), res.status());
}
Ok(res)
}
@@ -203,7 +211,7 @@ pub fn serve_thread_main<S>(
where
S: Future<Output = ()> + Send + Sync,
{
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
tracing::info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
// Create a Service from the router above to handle incoming requests.
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();

View File

@@ -45,3 +45,115 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
Ok(())
}
/// Disable the default rust panic hook by using `set_hook`.
///
/// For neon binaries, the assumption is that tracing is configured before with [`init`], after
/// that sentry is configured (if needed). sentry will install it's own on top of this, always
/// processing the panic before we log it.
///
/// When the return value is dropped, the hook is reverted to std default hook (prints to stderr).
/// If the assumptions about the initialization order are not held, use
/// [`TracingPanicHookGuard::disarm`] but keep in mind, if tracing is stopped, then panics will be
/// lost.
#[must_use]
pub fn replace_panic_hook_with_tracing_panic_hook() -> TracingPanicHookGuard {
std::panic::set_hook(Box::new(tracing_panic_hook));
TracingPanicHookGuard::new()
}
/// Drop guard which restores the std panic hook on drop.
///
/// Tracing should not be used when it's not configured, but we cannot really latch on to any
/// imaginary lifetime of tracing.
pub struct TracingPanicHookGuard {
act: bool,
}
impl TracingPanicHookGuard {
fn new() -> Self {
TracingPanicHookGuard { act: true }
}
/// Make this hook guard not do anything when dropped.
pub fn forget(&mut self) {
self.act = false;
}
}
impl Drop for TracingPanicHookGuard {
fn drop(&mut self) {
if self.act {
let _ = std::panic::take_hook();
}
}
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();
let msg = match info.payload().downcast_ref::<&'static str>() {
Some(s) => *s,
None => match info.payload().downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
let thread = std::thread::current();
let thread = thread.name().unwrap_or("<unnamed>");
let backtrace = std::backtrace::Backtrace::capture();
let _entered = if let Some(location) = location {
tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
} else {
// very unlikely to hit here, but the guarantees of std could change
tracing::error_span!("panic", %thread)
}
.entered();
if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
// this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
// get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
// string, maybe even to a TLS one but tracing already does that.
tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
} else {
tracing::error!("{msg}");
}
// ensure that we log something on the panic if this hook is left after tracing has been
// unconfigured. worst case when teardown is racing the panic is to log the panic twice.
tracing::dispatcher::get_default(|d| {
if let Some(_none) = d.downcast_ref::<tracing::subscriber::NoSubscriber>() {
let location = location.map(PrettyLocation);
log_panic_to_stderr(thread, msg, location, &backtrace);
}
});
}
#[cold]
fn log_panic_to_stderr(
thread: &str,
msg: &str,
location: Option<PrettyLocation<'_, '_>>,
backtrace: &std::backtrace::Backtrace,
) {
eprintln!("panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}");
}
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
impl std::fmt::Display for PrettyLocation<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
}
}
impl std::fmt::Debug for PrettyLocation<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<Self as std::fmt::Display>::fmt(self, f)
}
}

View File

@@ -91,9 +91,9 @@ fn main() -> anyhow::Result<()> {
// Initialize logging, which must be initialized before the custom panic hook is installed.
logging::init(conf.log_format)?;
// disable the default rust panic hook by using `set_hook`. sentry will install it's own on top
// of this, always processing the panic before we log it.
std::panic::set_hook(Box::new(tracing_panic_hook));
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(
@@ -499,50 +499,6 @@ fn cli() -> Command {
)
}
/// Named symbol for our panic hook, which logs the panic.
fn tracing_panic_hook(info: &std::panic::PanicInfo) {
// following rust 1.66.1 std implementation:
// https://github.com/rust-lang/rust/blob/90743e7298aca107ddaa0c202a4d3604e29bfeb6/library/std/src/panicking.rs#L235-L288
let location = info.location();
let msg = match info.payload().downcast_ref::<&'static str>() {
Some(s) => *s,
None => match info.payload().downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
let thread = std::thread::current();
let thread = thread.name().unwrap_or("<unnamed>");
let backtrace = std::backtrace::Backtrace::capture();
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
impl std::fmt::Display for PrettyLocation<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}:{}", self.0.file(), self.0.line(), self.0.column())
}
}
let _entered = if let Some(location) = location {
tracing::error_span!("panic", %thread, location = %PrettyLocation(location))
} else {
// very unlikely to hit here, but the guarantees of std could change
tracing::error_span!("panic", %thread)
}
.entered();
if backtrace.status() == std::backtrace::BacktraceStatus::Captured {
// this has an annoying extra '\n' in the end which anyhow doesn't do, but we cannot really
// get rid of it as we cannot get in between of std::fmt::Formatter<'_>; we could format to
// string, maybe even to a TLS one but tracing already does that.
tracing::error!("{msg}\n\nStack backtrace:\n{backtrace}");
} else {
tracing::error!("{msg}");
}
}
#[test]
fn verify_cli() {
cli().debug_assert();

View File

@@ -330,7 +330,7 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
let result = match result {
LsnForTimestamp::Present(lsn) => format!("{lsn}"),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Future(lsn) => format!("{lsn}"),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};

View File

@@ -290,6 +290,7 @@ impl Timeline {
}
}
///
/// Locate LSN, such that all transactions that committed before
/// 'search_timestamp' are visible, but nothing newer is.
///
@@ -303,7 +304,11 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<LsnForTimestamp, PageReconstructError> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let min_lsn = *gc_cutoff_lsn_guard;
// We use this method to figure out the branching LSN for new branch, but
// GC cutoff could be before branching point and we cannot create new branch
// with LSN < `ancestor_lsn`. Thus, pick the maximum of these two just to be
// on the safe side.
let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
let max_lsn = self.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the
@@ -327,12 +332,22 @@ impl Timeline {
)
.await?;
if cmp {
high = mid;
} else {
if !cmp {
// We either 1) found commit with timestamp **before** `search_timestamp`;
// or 2) we haven't found any commit records at all.
// Search with a larger LSN, in case 1) to try to find a more recent commit
// (but still **before** target timestamp); and in case 2) to fetch more
// SLRU segments for `clog`.
low = mid + 1;
} else {
// We found only more recent commits, search in the older range.
high = mid;
}
}
// If `found_smaller == true`, `low` is the LSN of the first commit record
// **before** the `search_timestamp` + 1 (to hit the while loop exit condition).
// Substitute 1 to get exactly the commit LSN.
let commit_lsn = Lsn((low - 1) * 8);
match (found_smaller, found_larger) {
(false, false) => {
// This can happen if no commit records have been processed yet, e.g.
@@ -340,32 +355,26 @@ impl Timeline {
Ok(LsnForTimestamp::NoData(max_lsn))
}
(true, false) => {
// Didn't find any commit timestamps larger than the request
Ok(LsnForTimestamp::Future(max_lsn))
// Only found a commit with timestamp smaller than the request.
// It's still a valid case for branch creation, return it.
// And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
// case, anyway.
Ok(LsnForTimestamp::Future(commit_lsn))
}
(false, true) => {
// Didn't find any commit timestamps smaller than the request
Ok(LsnForTimestamp::Past(max_lsn))
}
(true, true) => {
// low is the LSN of the first commit record *after* the search_timestamp,
// Back off by one to get to the point just before the commit.
//
// FIXME: it would be better to get the LSN of the previous commit.
// Otherwise, if you restore to the returned LSN, the database will
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
}
(true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
}
}
///
/// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
/// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
/// Subroutine of `find_lsn_for_timestamp()`. Returns `true`, if there are any
/// commits that committed after `search_timestamp`, at LSN `probe_lsn`.
///
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
/// with a smaller/larger timestamp.
/// Additionally, sets `found_smaller` / `found_larger`, if encounters any commits
/// with a smaller / larger timestamp.
///
pub async fn is_latest_commit_timestamp_ge_than(
&self,

View File

@@ -3,7 +3,7 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
@@ -11,6 +11,7 @@ use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantId;
@@ -53,37 +54,55 @@ async fn compaction_loop(tenant_id: TenantId) {
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
};
let mut sleep_duration = tenant.get_compaction_period();
if sleep_duration == Duration::ZERO {
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&ctx).await {
sleep_duration = wait_duration;
error!("Compaction failed, retrying in {:?}: {e:?}", sleep_duration);
let period = tenant.get_compaction_period();
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let started_at = Instant::now();
let sleep_duration = if period == Duration::ZERO {
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&ctx).await {
error!("Compaction failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {
period
}
};
warn_when_period_overrun(started_at.elapsed(), period, "compaction");
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -105,14 +124,16 @@ async fn gc_loop(tenant_id: TenantId) {
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
@@ -122,27 +143,38 @@ async fn gc_loop(tenant_id: TenantId) {
},
};
let gc_period = tenant.get_gc_period();
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
if sleep_duration == Duration::ZERO {
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
// Run gc
if gc_horizon > 0 {
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);
}
let period = tenant.get_gc_period();
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let started_at = Instant::now();
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
Duration::from_secs(10)
} else {
// Run gc
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
if let Err(e) = res {
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
} else {
period
}
};
warn_when_period_overrun(started_at.elapsed(), period, "gc");
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -197,3 +229,49 @@ async fn wait_for_active_tenant(
}
}
}
#[derive(thiserror::Error, Debug)]
#[error("cancelled")]
pub(crate) struct Cancelled;
/// Provide a random delay for background task initialization.
///
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
/// different periods for more stable load.
pub(crate) async fn random_init_delay(
period: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
use rand::Rng;
let d = {
let mut rng = rand::thread_rng();
// gen_range asserts that the range cannot be empty, which it could be because period can
// be set to zero to disable gc or compaction, so lets set it to be at least 10s.
let period = std::cmp::max(period, Duration::from_secs(10));
// semi-ok default as the source of jitter
rng.gen_range(Duration::ZERO..=period)
};
tokio::select! {
_ = cancel.cancelled() => Err(Cancelled),
_ = tokio::time::sleep(d) => Ok(()),
}
}
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
if elapsed >= period && period != Duration::ZERO {
// humantime does no significant digits clamping whereas Duration's debug is a bit more
// intelligent. however it makes sense to keep the "configuration format" for period, even
// though there's no way to output the actual config value.
warn!(
?elapsed,
period = %humantime::format_duration(period),
task,
"task iteration took longer than the configured period"
);
}
}

View File

@@ -19,6 +19,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::fs;
use std::ops::{Deref, Range};
@@ -82,6 +83,25 @@ enum FlushLoopState {
Exited,
}
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
other.coverage_size.cmp(&self.coverage_size) // inverse order
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -1770,15 +1790,9 @@ impl Timeline {
let calculation = async {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
tokio::task::spawn_blocking(move || {
// Run in a separate thread since this can do a lot of
// synchronous file IO without .await inbetween
// if there are no RemoteLayers that would require downloading.
let h = tokio::runtime::Handle::current();
h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel, &ctx))
})
.await
.context("Failed to spawn calculation result task")?
self_calculation
.calculate_logical_size(init_lsn, cancel, &ctx)
.await
};
let timeline_state_cancellation = async {
loop {
@@ -1811,7 +1825,7 @@ impl Timeline {
tokio::pin!(calculation);
loop {
tokio::select! {
res = &mut calculation => { return res }
res = &mut calculation => { return res }
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
@@ -2947,6 +2961,47 @@ impl Timeline {
},
)?;
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? {
if let Some(prev_key) = prev {
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
}
prev = Some(next_key.next());
}
drop(layers);
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
@@ -3041,14 +3096,22 @@ impl Timeline {
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow...
let contains_hole =
next_hole < holes.len() && key >= holes[next_hole].key_range.end;
// check if key cause layer overflow or contains hole...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
writer = None;
if contains_hole {
// skip hole
next_hole += 1;
}
}
}
// Remember size of key value because at next iteration we will access next item
@@ -3745,6 +3808,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
info!("on-demand download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download

View File

@@ -41,9 +41,23 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
use crate::tenant::tasks::random_init_delay;
{
let policy = self.get_eviction_policy();
let period = match policy {
EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &cancel).await.is_err() {
info!("shutting down");
return;
}
}
loop {
let policy = self.get_eviction_policy();
let cf = self.eviction_iteration(&policy, cancel.clone()).await;
match cf {
ControlFlow::Break(()) => break,
ControlFlow::Continue(sleep_until) => {
@@ -78,13 +92,7 @@ impl Timeline {
ControlFlow::Continue(()) => (),
}
let elapsed = start.elapsed();
if elapsed > p.period {
warn!(
configured_period = %humantime::format_duration(p.period),
last_period = %humantime::format_duration(elapsed),
"this eviction period took longer than the configured period"
);
}
crate::tenant::tasks::warn_when_period_overrun(elapsed, p.period, "eviction");
ControlFlow::Continue(start + p.period)
}
}

View File

@@ -43,6 +43,7 @@ async fn flatten_err(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("Version: {GIT_VERSION}");

View File

@@ -126,7 +126,12 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
// important to keep the order of:
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
info!("version: {GIT_VERSION}");
let args_workdir = &args.datadir;

View File

@@ -424,12 +424,16 @@ async fn http1_handler(
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
let args = Args::parse();
// important to keep the order of:
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("version: {GIT_VERSION}");
::metrics::set_build_info_metric(GIT_VERSION);

View File

@@ -2079,6 +2079,9 @@ class NeonPageserver(PgProtocol):
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
]
def start(

View File

@@ -17,7 +17,9 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
# Disable `synchronous_commit`` to make this initialization go faster.
# XXX: on my laptop this test takes 7s, and setting `synchronous_commit=off`
# doesn't change anything.
#
# Each row contains current insert LSN and the current timestamp, when
# the row was inserted.
@@ -32,20 +34,23 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Execute one more transaction with synchronous_commit enabled, to flush
# all the previous transactions
cur.execute("SET synchronous_commit=on")
cur.execute("INSERT INTO foo VALUES (-1)")
# Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
with env.pageserver.http_client() as client:
# Check edge cases: timestamp in the future
# Check edge cases
# Timestamp is in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result == "future"
# We should still return LSN of the first commit before timestamp
assert result not in ["past", "nodata"]
# timestamp too the far history
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
@@ -55,10 +60,12 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
lsn = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
# Call get_lsn_by_timestamp to get the LSN
assert lsn not in ["past", "nodata"]
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(