Compare commits

..

32 Commits

Author SHA1 Message Date
Joonas Koivunen
f8536c741f refactor: needlessly fallible
DeleteTenantFlow::should_resume_deletion -- unsure why it originally
left like this, probably an oversight I had while reviewing.
2023-11-27 11:42:28 +00:00
John Spray
b80b9e1c4c pageserver: remove defunct local timeline delete markers (#5699)
## Problem

Historically, we treated the presence of a timeline on local disk as
evidence that it logically exists. Since #5580 that is no longer the
case, so we can always rely on remote storage. If we restart and the
timeline is gone in remote storage, we will also purge it from local
disk: no need for a marker.

Reference on why this PR is for timeline markers and not tenant markers:
https://github.com/neondatabase/neon/issues/5080#issuecomment-1783187807

## Summary of changes

Remove code paths that read + write deletion marker for timelines.

Leave code path that deletes these markers, just in case we deploy while
there are some in existence. This can be cleaned up later.
(https://github.com/neondatabase/neon/issues/5718)
2023-11-27 09:31:20 +00:00
Anastasia Lubennikova
87b8ac3ec3 Only create neon extension in postgres database; (#5918)
Create neon extension in neon schema.
2023-11-26 08:37:01 +00:00
Joonas Koivunen
6b1c4cc983 fix: long timeline create cancelled by tenant delete (#5917)
Fix the fallible vs. infallible check order with
`UninitTimeline::finish_creation` so that the incomplete timeline can be
removed. Currently the order of drop guard unwrapping causes uninit
files to be left on pageserver, blocking the tenant deletion.

Cc: #5914
Cc: #investigation-2023-11-23-stuck-tenant-deletion
2023-11-24 16:17:56 +00:00
Joonas Koivunen
831fad46d5 tests: fix allowed_error for compaction detecting a shutdown (#5919)
This has been causing flaky tests, [example evidence].

Follow-up to #5883 where I forgot to fix this.

[example evidence]:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5917/6981540065/index.html#suites/9d2450a537238135fd4007859e09aca7/6fd3556a879fa3d1
2023-11-24 16:14:32 +00:00
Joonas Koivunen
53851ea8ec fix: log cancelled request handler errors (#5915)
noticed during [investigation] with @problame a major point of lost
error logging which would had sped up the investigation.

Cc: #5815

[investigation]:
https://neondb.slack.com/archives/C066ZFAJU85/p1700751858049319
2023-11-24 15:54:06 +02:00
Joonas Koivunen
044375732a test: support validating allowed_errors against a logfile (#5905)
this will make it easier to test if an added allowed_error does in fact
match for example against a log file from an allure report.

```
$ python3 test_runner/fixtures/pageserver/allowed_errors.py --help
usage: allowed_errors.py [-h] [-i INPUT]

check input against pageserver global allowed_errors

optional arguments:
  -h, --help            show this help message and exit
  -i INPUT, --input INPUT
                        Pageserver logs file. Reads from stdin if no file is provided.
```

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2023-11-24 12:43:25 +00:00
Konstantin Knizhnik
ea63b43009 Check if LFC was intialized in local_cache_pages function (#5911)
## Problem

There is not check that LFC is initialised (`lfc_max_size != 0`) in
`local_cache_pages` function

## Summary of changes

Add proper check.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-11-24 08:23:00 +02:00
Conrad Ludgate
a56fd45f56 proxy: fix memory leak again (#5909)
## Problem

The connections.join_next helped but it wasn't enough... The way I
implemented the improvement before was still faulty but it mostly worked
so it looked like it was working correctly.

From [`tokio::select`
docs](https://docs.rs/tokio/latest/tokio/macro.select.html):
> 4. Once an <async expression> returns a value, attempt to apply the
value to the provided <pattern>, if the pattern matches, evaluate
<handler> and return. If the pattern does not match, disable the current
branch and for the remainder of the current call to select!. Continue
from step 3.

The `connections.join_next()` future would complete and `Some(Err(e))`
branch would be evaluated but not match (as the future would complete
without panicking, we would hope). Since the branch doesn't match, it's
disabled. The select continues but never attempts to call `join_next`
again. Getting unlucky, more TCP connections are created than we attempt
to join_next.

## Summary of changes

Replace the `Some(Err(e))` pattern with `Some(e)`. Because of the
auto-disabling feature, we don't need the `if !connections.is_empty()`
step as the `None` pattern will disable it for us.
2023-11-23 19:11:24 +00:00
Anastasia Lubennikova
582a42762b update extension version in test_neon_extension 2023-11-23 18:53:03 +00:00
Anastasia Lubennikova
f5dfa6f140 Create extension neon in existing databases too 2023-11-23 18:53:03 +00:00
Anastasia Lubennikova
f8d9bd8d14 Add extension neon to all databases.
- Run CREATE EXTENSION neon for template1, so that it was created in all databases.
- Run ALTER EXTENSION neon in all databases, to always have the newest version of the extension in computes.
- Add test_neon_extension test
2023-11-23 18:53:03 +00:00
Anastasia Lubennikova
04e6c09f14 Add pgxn/neon/README.md 2023-11-23 18:53:03 +00:00
Arpad Müller
54327bbeec Upload initdb results to S3 (#5390)
## Problem

See #2592

## Summary of changes

Compresses the results of initdb into a .tar.zst file and uploads them
to S3, to enable usage in recovery from lsn.

Generations should not be involved I think because we do this only once
at the very beginning of a timeline.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-11-23 18:11:52 +00:00
Shany Pozin
35f243e787 Move weekly release PR trigger to Monday morning (#5908) 2023-11-23 19:09:34 +02:00
Shany Pozin
b7a988ba46 Support cancellation for find_lsn_for_timestamp API (#5904)
## Problem
#5900
## Summary of changes
Added cancellation token as param in all relevant code paths and actually used it in the find_lsn_for_timestamp main loop
2023-11-23 17:08:32 +02:00
Christian Schwarz
a0e61145c8 fix: cleanup of layers from the future can race with their re-creation (#5890)
fixes https://github.com/neondatabase/neon/issues/5878
obsoletes https://github.com/neondatabase/neon/issues/5879

Before this PR, it could happen that `load_layer_map` schedules removal
of the future
image layer. Then a later compaction run could re-create the same image
layer, scheduling a PUT.
Due to lack of an upload queue barrier, the PUT and DELETE could be
re-ordered.
The result was IndexPart referencing a non-existent object.

## Summary of changes

* Add support to `pagectl` / Python tests to decode `IndexPart`
  * Rust
    * new `pagectl` Subcommand
* `IndexPart::{from,to}_s3_bytes()` methods to internalize knowledge
about encoding of `IndexPart`
  * Python
    * new `NeonCli` subclass
* Add regression test
  * Rust
* Ability to force repartitioning; required to ensure image layer
creation at last_record_lsn
  * Python
    * The regression test.
* Fix the issue
  * Insert an `UploadOp::Barrier` after scheduling the deletions.
2023-11-23 13:33:41 +00:00
Konstantin Knizhnik
6afbadc90e LFC fixes + statistics (#5727)
## Problem

## Summary of changes

See #5500

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-11-23 08:59:19 +02:00
Anastasia Lubennikova
2a12e9c46b Add documentation for our sample pre-commit hook (#5868) 2023-11-22 12:04:36 +00:00
Christian Schwarz
9e3c07611c logging: support output to stderr (#5896)
(part of the getpage benchmarking epic #5771)

The plan is to make the benchmarking tool log on stderr and emit results
as JSON on stdout. That way, the test suite can simply take captures
stdout and json.loads() it, while interactive users of the benchmarking
tool have a reasonable experience as well.

Existing logging users continue to print to stdout, so, this change
should be a no-op functionally and performance-wise.
2023-11-22 11:08:35 +00:00
Christian Schwarz
d353fa1998 refer to our rust-postgres.git fork by branch name (#5894)
This way, `cargo update -p tokio-postgres` just works. The `Cargo.toml`
communicates more clearly that we're referring to the `main` branch. And
the git revision is still pinned in `Cargo.lock`.
2023-11-22 10:58:27 +00:00
Joonas Koivunen
0d10992e46 Cleanup compact_level0_phase1 fsyncing (#5852)
While reviewing code noticed a scary `layer_paths.pop().unwrap()` then
realized this should be further asyncified, something I forgot to do
when I switched the `compact_level0_phase1` back to async in #4938.

This keeps the double-fsync for new deltas as #4749 is still unsolved.
2023-11-21 15:30:40 +02:00
Arpad Müller
3e131bb3d7 Update Rust to 1.74.0 (#5873)
[Release notes](https://github.com/rust-lang/rust/releases/tag/1.74.0).
2023-11-21 11:41:41 +01:00
Sasha Krassovsky
81b2cefe10 Disallow CREATE DATABASE WITH OWNER neon_superuser (#5887)
## Problem
Currently, control plane doesn't know about neon_superuser, so if a user
creates a database with owner neon_superuser it causes an exception when
it tries to forward it. It is also currently possible to ALTER ROLE
neon_superuser.

## Summary of changes
Disallow creating database with owner neon_superuser. This is probably
fine, since I don't think you can create a database with owner normal
superuser. Also forbids altering neon_superuser
2023-11-20 22:39:47 +00:00
Christian Schwarz
d2ca410919 build: back to opt-level=0 in debug builds, for faster compile times (#5751)
This change brings down incremental compilation for me
from > 1min to 10s (and this is a pretty old Ryzen 1700X).

More details: "incremental compilation" here means to change one
character
in the `failed to read value from offset` string in `image_layer.rs`.
The command for incremental compilation is `cargo build_testing`.
The system on which I got these numbers uses `mold` via
`~/.cargo/config.toml`.

As a bonus, `rust-gdb` is now at least a little fun again.

Some tests are timing out in debug builds due to these changes.
This PR makes them skip for debug builds.
We run both with debug and release build, so, the loss of coverage is
marginal.

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2023-11-20 15:41:37 +01:00
Joonas Koivunen
d98ac04136 chore(background_tasks): missed allowed_error change, logging change (#5883)
- I am always confused by the log for the error wait time, now it will
be `2s` or `2.0s` not `2.0`
- fix missed string change introduced in #5881 [evidence]

[evidence]:
https://neon-github-public-dev.s3.amazonaws.com/reports/main/6921062837/index.html#suites/f9eba3cfdb71aa6e2b54f6466222829b/87897fe1ddee3825
2023-11-20 07:33:17 +00:00
Joonas Koivunen
ac08072d2e fix(layer): VirtualFile opening and read errors can be caused by contention (#5880)
A very low number of layer loads have been marked wrongly as permanent,
as I did not remember that `VirtualFile::open` or reading could fail
transiently for contention. Return separate errors for transient and
persistent errors from `{Delta,Image}LayerInner::load`.

Includes drive-by comment changes.

The implementation looks quite ugly because having the same type be both
the inner (operation error) and outer (critical error), but with the
alternatives I tried I did not find a better way.
2023-11-19 14:57:39 +00:00
John Spray
d22dce2e31 pageserver: shut down idle walredo processes (#5877)
The longer a pageserver runs, the more walredo processes it accumulates
from tenants that are touched intermittently (e.g. by availability
checks). This can lead to getting OOM killed.

Changes:
- Add an Instant recording the last use of the walredo process for a
tenant
- After compaction iteration in the background task, check for idleness
and stop the walredo process if idle for more than 10x compaction
period.

Cc: #3620

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Shany Pozin <shany@neon.tech>
2023-11-19 14:21:16 +00:00
Joonas Koivunen
3b3f040be3 fix(background_tasks): first backoff, compaction error stacktraces (#5881)
First compaction/gc error backoff starts from 0 which is less than 2s
what it was before #5672. This is now fixed to be the intended 2**n.

Additionally noticed the `compaction_iteration` creating an
`anyhow::Error` via `into()` always captures a stacktrace even if we had
a stacktraceful anyhow error within the CompactionError because there is
no stable api for querying that.
2023-11-19 14:16:31 +00:00
Em Sharnoff
cad0dca4b8 compute_ctl: Remove deprecated flag --file-cache-on-disk (#5622)
See neondatabase/cloud#7516 for more.
2023-11-18 12:43:54 +01:00
Em Sharnoff
5d13a2e426 Improve error message when neon.max_cluster_size reached (#4173)
Changes the error message encountered when the `neon.max_cluster_size`
limit is reached. Reasoning is that this is user-visible, and so should
*probably* use language that's closer to what users are familiar with.
2023-11-16 21:51:26 +00:00
khanova
0c243faf96 Proxy log pid hack (#5869)
## Problem

Improve observability for the compute node.

## Summary of changes

Log pid from the compute node. Doesn't work with pgbouncer.
2023-11-16 20:46:23 +00:00
79 changed files with 2104 additions and 533 deletions

View File

@@ -1,17 +1,3 @@
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
#
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
#
[profile.dev.package."*"]
# Set the default for dependencies in Development mode.
opt-level = 3
[profile.dev]
# Turn on a small amount of optimization in Development mode.
opt-level = 1
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.

View File

@@ -2,7 +2,7 @@ name: Create Release Branch
on:
schedule:
- cron: '0 7 * * 5'
- cron: '0 6 * * 1'
workflow_dispatch:
jobs:

View File

@@ -9,6 +9,24 @@ refactoring, additional comments, and so forth. Let's try to raise the
bar, and clean things up as we go. Try to leave code in a better shape
than it was before.
## Pre-commit hook
We have a sample pre-commit hook in `pre-commit.py`.
To set it up, run:
```bash
ln -s ../../pre-commit.py .git/hooks/pre-commit
```
This will run following checks on staged files before each commit:
- `rustfmt`
- checks for python files, see [obligatory checks](/docs/sourcetree.md#obligatory-checks).
There is also a separate script `./run_clippy.sh` that runs `cargo clippy` on the whole project
and `./scripts/reformat` that runs all formatting tools to ensure the project is up to date.
If you want to skip the hook, run `git commit` with `--no-verify` option.
## Submitting changes
1. Get at least one +1 on your PR before you push.

17
Cargo.lock generated
View File

@@ -193,6 +193,8 @@ dependencies = [
"memchr",
"pin-project-lite",
"tokio",
"zstd",
"zstd-safe",
]
[[package]]
@@ -2905,6 +2907,8 @@ dependencies = [
"git-version",
"pageserver",
"postgres_ffi",
"serde",
"serde_json",
"svg_fmt",
"tokio",
"utils",
@@ -3221,7 +3225,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -3234,7 +3238,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"native-tls",
"tokio",
@@ -3245,7 +3249,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -3263,7 +3267,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4933,7 +4937,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"async-trait",
"byteorder",
@@ -6031,6 +6035,9 @@ dependencies = [
"tungstenite",
"url",
"uuid",
"zstd",
"zstd-safe",
"zstd-sys",
]
[[package]]

View File

@@ -37,7 +37,7 @@ license = "Apache-2.0"
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
azure_core = "0.16"
azure_identity = "0.16"
azure_storage = "0.16"
@@ -165,11 +165,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -206,7 +206,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
################# Binary contents sections

View File

@@ -714,23 +714,6 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
#########################################################################################
#
# Layer "pg-wait-sampling-pg-build"
# compile pg_wait_sampling extension
#
#########################################################################################
FROM build-deps AS pg-wait-sampling-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/postgrespro/pg_wait_sampling/archive/refs/tags/v1.1.5.tar.gz -O pg_wait_sampling.tar.gz && \
echo 'a03da6a413f5652ce470a3635ed6ebba528c74cb26aa4cfced8aff8a8441f81ec6dd657ff62cd6ce96a4e6ce02cad9f2519ae9525367ece60497aa20faafde5c pg_wait_sampling.tar.gz' | sha512sum -c && \
mkdir pg_wait_sampling-src && cd pg_wait_sampling-src && tar xvzf ../pg_wait_sampling.tar.gz --strip-components=1 -C . && \
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) && \
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_wait_sampling.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
@@ -767,7 +750,6 @@ COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-wait-sampling-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -479,13 +479,6 @@ fn cli() -> clap::Command {
)
.value_name("FILECACHE_CONNSTR"),
)
.arg(
// DEPRECATED, NO LONGER DOES ANYTHING.
// See https://github.com/neondatabase/cloud/issues/7516
Arg::new("file-cache-on-disk")
.long("file-cache-on-disk")
.action(clap::ArgAction::SetTrue),
)
}
#[test]

View File

@@ -698,6 +698,7 @@ impl ComputeNode {
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, &mut client, self.connstr.as_str())?;
handle_extensions(spec, &mut client)?;
handle_extension_neon(&mut client)?;
create_availability_check_data(&mut client)?;
// 'Close' connection
@@ -742,6 +743,7 @@ impl ComputeNode {
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, &mut client, self.connstr.as_str())?;
handle_extensions(&spec, &mut client)?;
handle_extension_neon(&mut client)?;
}
// 'Close' connection

View File

@@ -674,3 +674,30 @@ pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()>
Ok(())
}
/// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
#[instrument(skip_all)]
pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
info!("handle extension neon");
let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
client.simple_query(query)?;
query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
info!("create neon extension with query: {}", query);
client.simple_query(query)?;
query = "ALTER EXTENSION neon SET SCHEMA neon";
info!("alter neon extension schema with query: {}", query);
client.simple_query(query)?;
// this will be a no-op if extension is already up to date,
// which may happen in two cases:
// - extension was just installed
// - extension was already installed and is up to date
let query = "ALTER EXTENSION neon UPDATE";
info!("update neon extension schema with query: {}", query);
client.simple_query(query)?;
Ok(())
}

View File

@@ -286,6 +286,7 @@ async fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
let args = Cli::parse();

View File

@@ -487,8 +487,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.copied()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info =
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
let new_timeline_id_opt = parse_timeline_id(create_match)?;
let timeline_info = pageserver.timeline_create(
tenant_id,
new_timeline_id_opt,
None,
None,
Some(pg_version),
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
@@ -1308,6 +1315,7 @@ fn cli() -> Command {
.subcommand(Command::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(pg_version_arg.clone())
)

View File

@@ -281,6 +281,7 @@ fn ensure_logging_ready() {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
});

View File

@@ -210,6 +210,7 @@ fn ensure_logging_ready() {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
});

View File

@@ -0,0 +1,21 @@
#!/bin/bash
# like restore_from_wal.sh, but takes existing initdb.tar.zst
set -euxo pipefail
PG_BIN=$1
WAL_PATH=$2
DATA_DIR=$3
PORT=$4
echo "port=$PORT" >> "$DATA_DIR"/postgresql.conf
echo "shared_preload_libraries='\$libdir/neon_rmgr.so'" >> "$DATA_DIR"/postgresql.conf
REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"| cut -c 42-)
declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" start
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001

View File

@@ -66,9 +66,17 @@ pub enum TracingErrorLayerEnablement {
EnableWithRustLogFilter,
}
/// Where the logging should output to.
#[derive(Clone, Copy)]
pub enum Output {
Stdout,
Stderr,
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
output: Output,
) -> anyhow::Result<()> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
@@ -85,7 +93,12 @@ pub fn init(
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(false)
.with_writer(std::io::stdout);
.with_writer(move || -> Box<dyn std::io::Write> {
match output {
Output::Stdout => Box::new(std::io::stdout()),
Output::Stderr => Box::new(std::io::stderr()),
}
});
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),

View File

@@ -18,3 +18,5 @@ tokio.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -0,0 +1,38 @@
use std::collections::HashMap;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
use utils::lsn::Lsn;
#[derive(clap::Subcommand)]
pub(crate) enum IndexPartCmd {
Dump { path: Utf8PathBuf },
}
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
match cmd {
IndexPartCmd::Dump { path } => {
let bytes = tokio::fs::read(path).await.context("read file")?;
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
#[derive(serde::Serialize)]
struct Output<'a> {
layer_metadata: &'a HashMap<LayerFileName, IndexLayerMetadata>,
disk_consistent_lsn: Lsn,
timeline_metadata: &'a TimelineMetadata,
}
let output = Output {
layer_metadata: &des.layer_metadata,
disk_consistent_lsn: des.get_disk_consistent_lsn(),
timeline_metadata: &des.metadata,
};
let output = serde_json::to_string_pretty(&output).context("serialize output")?;
println!("{output}");
Ok(())
}
}
}

View File

@@ -5,11 +5,13 @@
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod draw_timeline_dir;
mod index_part;
mod layer_map_analyzer;
mod layers;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use index_part::IndexPartCmd;
use layers::LayerCmd;
use pageserver::{
context::{DownloadBehavior, RequestContext},
@@ -38,6 +40,8 @@ struct CliOpts {
#[derive(Subcommand)]
enum Commands {
Metadata(MetadataCmd),
#[command(subcommand)]
IndexPart(IndexPartCmd),
PrintLayerFile(PrintLayerFileCmd),
DrawTimeline {},
AnalyzeLayerMap(AnalyzeLayerMapCmd),
@@ -83,6 +87,9 @@ async fn main() -> anyhow::Result<()> {
Commands::Metadata(cmd) => {
handle_metadata(&cmd)?;
}
Commands::IndexPart(cmd) => {
index_part::main(&cmd).await?;
}
Commands::DrawTimeline {} => {
draw_timeline_dir::main()?;
}

View File

@@ -103,7 +103,11 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(conf.log_format, tracing_error_layer_enablement)?;
logging::init(
conf.log_format,
tracing_error_layer_enablement,
logging::Output::Stdout,
)?;
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.
@@ -621,6 +625,7 @@ fn start_pageserver(
conf.synthetic_size_calculation_interval,
conf.id,
local_disk_storage,
cancel,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))

View File

@@ -3,7 +3,7 @@
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
@@ -12,6 +12,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::NodeId;
@@ -37,6 +38,7 @@ type RawMetric = (MetricsKey, (EventType, u64));
type Cache = HashMap<MetricsKey, (EventType, u64)>;
/// Main thread that serves metrics collection
#[allow(clippy::too_many_arguments)]
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
@@ -44,6 +46,7 @@ pub async fn collect_metrics(
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
local_disk_storage: Utf8PathBuf,
cancel: CancellationToken,
ctx: RequestContext,
) -> anyhow::Result<()> {
if _cached_metric_collection_interval != Duration::ZERO {
@@ -63,9 +66,13 @@ pub async fn collect_metrics(
"synthetic size calculation",
false,
async move {
calculate_synthetic_size_worker(synthetic_size_calculation_interval, &worker_ctx)
.instrument(info_span!("synthetic_size_worker"))
.await?;
calculate_synthetic_size_worker(
synthetic_size_calculation_interval,
&cancel,
&worker_ctx,
)
.instrument(info_span!("synthetic_size_worker"))
.await?;
Ok(())
},
);
@@ -241,6 +248,7 @@ async fn reschedule(
/// Caclculate synthetic size for each active tenant
async fn calculate_synthetic_size_worker(
synthetic_size_calculation_interval: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
@@ -272,7 +280,12 @@ async fn calculate_synthetic_size_worker(
// Same for the loop that fetches computed metrics.
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
// which turns out is really handy to understand the system.
if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
if let Err(e) = tenant.calculate_synthetic_size(cause, cancel, ctx).await {
if let Some(PageReconstructError::Cancelled) =
e.downcast_ref::<PageReconstructError>()
{
return Ok(());
}
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
}
}

View File

@@ -513,6 +513,7 @@ impl DeletionQueueClient {
) -> Result<(), DeletionQueueError> {
if current_generation.is_none() {
debug!("Enqueuing deletions in legacy mode, skipping queue");
let mut layer_paths = Vec::new();
for (layer, generation) in layers {
layer_paths.push(remote_layer_path(

View File

@@ -6,6 +6,7 @@ use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
use futures::TryFutureExt;
use humantime::format_rfc3339;
use hyper::header;
@@ -42,6 +43,7 @@ use crate::tenant::mgr::{
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::Timeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
use crate::{config::PageServerConf, tenant::mgr};
@@ -548,7 +550,7 @@ async fn timeline_detail_handler(
async fn get_lsn_by_timestamp_handler(
request: Request<Body>,
_cancel: CancellationToken,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -564,7 +566,9 @@ async fn get_lsn_by_timestamp_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?;
let result = timeline
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
.await?;
if version.unwrap_or(0) > 1 {
#[derive(serde::Serialize)]
@@ -840,7 +844,7 @@ async fn tenant_delete_handler(
/// without modifying anything anyway.
async fn tenant_size_handler(
request: Request<Body>,
_cancel: CancellationToken,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -856,6 +860,7 @@ async fn tenant_size_handler(
.gather_size_inputs(
retention_period,
LogicalSizeCalculationCause::TenantSizeHandler,
&cancel,
&ctx,
)
.await
@@ -1240,7 +1245,7 @@ async fn failpoints_handler(
// Run GC immediately on given timeline.
async fn timeline_gc_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -1249,7 +1254,7 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, &ctx).await?;
let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, cancel, &ctx).await?;
let gc_result = wait_task_done
.await
.context("wait for gc task")
@@ -1268,11 +1273,15 @@ async fn timeline_compact_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let mut flags = EnumSet::empty();
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
flags |= CompactFlags::ForceRepartition;
}
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
timeline
.compact(&cancel, &ctx)
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, ())
@@ -1289,6 +1298,11 @@ async fn timeline_checkpoint_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let mut flags = EnumSet::empty();
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
flags |= CompactFlags::ForceRepartition;
}
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
@@ -1297,7 +1311,7 @@ async fn timeline_checkpoint_handler(
.await
.map_err(ApiError::InternalServerError)?;
timeline
.compact(&cancel, &ctx)
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
@@ -1675,8 +1689,24 @@ where
let token_cloned = token.clone();
let result = handler(r, token).await;
if token_cloned.is_cancelled() {
info!("Cancelled request finished");
// dropguard has executed: we will never turn this result into response.
//
// at least temporarily do {:?} logging; these failures are rare enough but
// could hide difficult errors.
match &result {
Ok(response) => {
let status = response.status();
info!(%status, "Cancelled request finished successfully")
}
Err(e) => error!("Cancelled request finished with an error: {e:?}"),
}
}
// only logging for cancelled panicked request handlers is the tracing_panic_hook,
// which should suffice.
//
// there is still a chance to lose the result due to race between
// returning from here and the actual connection closing happening
// before outer task gets to execute. leaving that up for #5815.
result
}
.in_current_span(),

View File

@@ -3,18 +3,25 @@
//! a neon Timeline.
//!
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{self, Poll};
use anyhow::{bail, ensure, Context, Result};
use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
use bytes::Bytes;
use camino::Utf8Path;
use futures::StreamExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use nix::NixPath;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_tar::Archive;
use tokio_tar::Builder;
use tokio_tar::HeaderMode;
use tracing::*;
use walkdir::WalkDir;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::*;
use crate::tenant::remote_timeline_client::INITDB_PATH;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
@@ -33,7 +40,9 @@ use utils::lsn::Lsn;
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
// Read control file to extract the LSN
let controlfile_path = path.join("global").join("pg_control");
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
let controlfile_buf = std::fs::read(&controlfile_path)
.with_context(|| format!("reading controlfile: {controlfile_path}"))?;
let controlfile = ControlFileData::decode(&controlfile_buf)?;
let lsn = controlfile.checkPoint;
Ok(Lsn(lsn))
@@ -618,3 +627,108 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes>
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
}
/// An in-memory buffer implementing `AsyncWrite`, inserting yields every now and then
///
/// The number of yields is bounded by above by the number of times poll_write is called,
/// so calling it with 8 KB chunks and 8 MB chunks gives the same number of yields in total.
/// This is an explicit choice as the `YieldingVec` is meant to give the async executor
/// breathing room between units of CPU intensive preparation of buffers to be written.
/// Once a write call is issued, the whole buffer has been prepared already, so there is no
/// gain in splitting up the memcopy further.
struct YieldingVec {
yield_budget: usize,
// the buffer written into
buf: Vec<u8>,
}
impl YieldingVec {
fn new() -> Self {
Self {
yield_budget: 0,
buf: Vec::new(),
}
}
// Whether we should yield for a read operation of given size
fn should_yield(&mut self, add_buf_len: usize) -> bool {
// Set this limit to a small value so that we are a
// good async citizen and yield repeatedly (but not
// too often for many small writes to cause many yields)
const YIELD_DIST: usize = 1024;
let target_buf_len = self.buf.len() + add_buf_len;
let ret = self.yield_budget / YIELD_DIST < target_buf_len / YIELD_DIST;
if self.yield_budget < target_buf_len {
self.yield_budget += add_buf_len;
}
ret
}
}
impl AsyncWrite for YieldingVec {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
if self.should_yield(buf.len()) {
cx.waker().wake_by_ref();
return Poll::Pending;
}
self.get_mut().buf.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result<Vec<u8>> {
let mut paths = Vec::new();
for entry in WalkDir::new(pgdata_path) {
let entry = entry?;
let metadata = entry.metadata().expect("error getting dir entry metadata");
// Also allow directories so that we also get empty directories
if !(metadata.is_file() || metadata.is_dir()) {
continue;
}
let path = entry.into_path();
paths.push(path);
}
// Do a sort to get a more consistent listing
paths.sort_unstable();
let zstd = ZstdEncoder::with_quality_and_params(
YieldingVec::new(),
Level::Default,
&[CParameter::enable_long_distance_matching(true)],
);
let mut builder = Builder::new(zstd);
// Use reproducible header mode
builder.mode(HeaderMode::Deterministic);
for path in paths {
let rel_path = path.strip_prefix(pgdata_path)?;
if rel_path.is_empty() {
// The top directory should not be compressed,
// the tar crate doesn't like that
continue;
}
builder.append_path_with_name(&path, rel_path).await?;
}
let mut zstd = builder.into_inner().await?;
zstd.shutdown().await?;
let compressed = zstd.into_inner();
let compressed_len = compressed.buf.len();
const INITDB_TAR_ZST_WARN_LIMIT: usize = 2_000_000;
if compressed_len > INITDB_TAR_ZST_WARN_LIMIT {
warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}.");
}
Ok(compressed.buf)
}

View File

@@ -638,7 +638,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
///
/// Operations:
/// - open ([`std::fs::OpenOptions::open`])
/// - close (dropping [`std::fs::File`])
/// - close (dropping [`crate::virtual_file::VirtualFile`])
/// - close-by-replace (close by replacement algorithm)
/// - read (`read_at`)
/// - write (`write_at`)

View File

@@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -365,6 +366,7 @@ impl Timeline {
pub async fn find_lsn_for_timestamp(
&self,
search_timestamp: TimestampTz,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<LsnForTimestamp, PageReconstructError> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
@@ -383,6 +385,9 @@ impl Timeline {
let mut found_smaller = false;
let mut found_larger = false;
while low < high {
if cancel.is_cancelled() {
return Err(PageReconstructError::Cancelled);
}
// cannot overflow, high and low are both smaller than u64::MAX / 2
let mid = (high + low) / 2;

View File

@@ -12,7 +12,9 @@
//!
use anyhow::{bail, Context};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
@@ -23,6 +25,7 @@ use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext;
@@ -291,6 +294,16 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
}
}
}
pub async fn request_redo(
&self,
key: crate::repository::Key,
@@ -605,21 +618,12 @@ impl Tenant {
// Remote preload is complete.
drop(remote_load_completion);
let pending_deletion = {
match DeleteTenantFlow::should_resume_deletion(
conf,
preload.as_ref().map(|p| p.deleting).unwrap_or(false),
&tenant_clone,
)
.await
{
Ok(should_resume_deletion) => should_resume_deletion,
Err(err) => {
make_broken(&tenant_clone, anyhow::anyhow!(err));
return Ok(());
}
}
};
let pending_deletion = DeleteTenantFlow::should_resume_deletion(
conf,
preload.as_ref().map(|p| p.deleting).unwrap_or(false),
&tenant_clone,
)
.await;
info!("pending_deletion {}", pending_deletion.is_some());
@@ -1619,6 +1623,7 @@ impl Tenant {
target_timeline_id: Option<TimelineId>,
horizon: u64,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<GcResult> {
// Don't start doing work during shutdown
@@ -1641,7 +1646,7 @@ impl Tenant {
}
}
self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
.await
}
@@ -1649,22 +1654,16 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub async fn compaction_iteration(
async fn compaction_iteration(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Don't start doing work during shutdown
if let TenantState::Stopping { .. } = self.current_state() {
) -> anyhow::Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(());
}
// We should only be called once the tenant has activated.
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
@@ -1695,7 +1694,7 @@ impl Tenant {
for (timeline_id, timeline) in &timelines_to_compact {
timeline
.compact(cancel, ctx)
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await?;
}
@@ -1850,6 +1849,7 @@ impl Tenant {
});
})
};
// test_long_timeline_create_then_tenant_delete is leaning on this message
tracing::info!("Waiting for timelines...");
while let Some(res) = js.join_next().await {
match res {
@@ -2564,14 +2564,30 @@ impl Tenant {
target_timeline_id: Option<TimelineId>,
horizon: u64,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<GcResult> {
let mut totals: GcResult = Default::default();
let now = Instant::now();
let gc_timelines = self
.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
.await?;
let gc_timelines = match self
.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
.await
{
Ok(result) => result,
Err(e) => {
if let Some(PageReconstructError::Cancelled) =
e.downcast_ref::<PageReconstructError>()
{
// Handle cancellation
totals.elapsed = now.elapsed();
return Ok(totals);
} else {
// Propagate other errors
return Err(e);
}
}
};
crate::failpoint_support::sleep_millis_async!(
"gc_iteration_internal_after_getting_gc_timelines"
@@ -2595,7 +2611,7 @@ impl Tenant {
// See comments in [`Tenant::branch_timeline`] for more information
// about why branch creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if task_mgr::is_shutdown_requested() {
if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
@@ -2615,6 +2631,7 @@ impl Tenant {
/// This is usually executed as part of periodic gc, but can now be triggered more often.
pub async fn refresh_gc_info(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Vec<Arc<Timeline>>> {
// since this method can now be called at different rates than the configured gc loop, it
@@ -2626,7 +2643,7 @@ impl Tenant {
// refresh all timelines
let target_timeline_id = None;
self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
.await
}
@@ -2635,6 +2652,7 @@ impl Tenant {
target_timeline_id: Option<TimelineId>,
horizon: u64,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Vec<Arc<Timeline>>> {
// grab mutex to prevent new timelines from being created here.
@@ -2708,7 +2726,7 @@ impl Tenant {
.map(|&x| x.1)
.collect();
timeline
.update_gc_info(branchpoints, cutoff, pitr, ctx)
.update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
.await?;
gc_timelines.push(timeline);
@@ -2871,7 +2889,7 @@ impl Tenant {
}
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
/// - after initialization completes, tar up the temp dir and upload it to S3.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
@@ -2912,6 +2930,30 @@ impl Tenant {
let pgdata_path = &initdb_path;
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
// Upload the created data dir to S3
if let Some(storage) = &self.remote_storage {
let pgdata_zstd = import_datadir::create_tar_zst(pgdata_path).await?;
let pgdata_zstd = Bytes::from(pgdata_zstd);
backoff::retry(
|| async {
self::remote_timeline_client::upload_initdb_dir(
storage,
&self.tenant_id,
&timeline_id,
pgdata_zstd.clone(),
)
.await
},
|_| false,
3,
u32::MAX,
"persist_initdb_tar_zst",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await?;
}
// Import the contents of the data directory at the initial checkpoint
// LSN, and any WAL after that.
// Initdb lsn will be equal to last_record_lsn which will be set after import.
@@ -3121,6 +3163,7 @@ impl Tenant {
// (only if it is shorter than the real cutoff).
max_retention_period: Option<u64>,
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<size::ModelInputs> {
let logical_sizes_at_once = self
@@ -3143,6 +3186,7 @@ impl Tenant {
max_retention_period,
&mut shared_cache,
cause,
cancel,
ctx,
)
.await
@@ -3155,9 +3199,10 @@ impl Tenant {
pub async fn calculate_synthetic_size(
&self,
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, cause, ctx).await?;
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
let size = inputs.calculate()?;
@@ -3500,6 +3545,7 @@ pub(crate) mod harness {
// enable it in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
});
@@ -3928,7 +3974,13 @@ mod tests {
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
.gc_iteration(
Some(TIMELINE_ID),
0x10,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
// try to branch at lsn 25, should fail because we already garbage collected the data
@@ -4031,7 +4083,13 @@ mod tests {
tline.set_broken("test".to_owned());
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
.gc_iteration(
Some(TIMELINE_ID),
0x10,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
// The branchpoints should contain all timelines, even ones marked
@@ -4077,7 +4135,13 @@ mod tests {
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
.gc_iteration(
Some(TIMELINE_ID),
0x10,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
@@ -4105,7 +4169,13 @@ mod tests {
// run gc on parent
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
.gc_iteration(
Some(TIMELINE_ID),
0x10,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
// Check that the data is still accessible on the branch.
@@ -4294,7 +4364,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let writer = tline.writer().await;
writer
@@ -4309,7 +4381,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let writer = tline.writer().await;
writer
@@ -4324,7 +4398,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let writer = tline.writer().await;
writer
@@ -4339,7 +4415,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
@@ -4407,10 +4485,18 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}
@@ -4487,10 +4573,18 @@ mod tests {
// Perform a cycle of flush, compact, and GC
let cutoff = tline.get_last_record_lsn();
tline
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}
@@ -4577,10 +4671,18 @@ mod tests {
// Perform a cycle of flush, compact, and GC
let cutoff = tline.get_last_record_lsn();
tline
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}

View File

@@ -361,25 +361,17 @@ impl DeleteTenantFlow {
conf: &'static PageServerConf,
remote_mark_exists: bool,
tenant: &Tenant,
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
let acquire = |t: &Tenant| {
) -> Option<DeletionGuard> {
let tenant_id = tenant.tenant_id;
if remote_mark_exists || conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
Some(
Arc::clone(&t.delete_progress)
Arc::clone(&tenant.delete_progress)
.try_lock_owned()
.expect("we're the only owner during init"),
)
};
if remote_mark_exists {
return Ok(acquire(tenant));
}
let tenant_id = tenant.tenant_id;
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
Ok(acquire(tenant))
} else {
Ok(None)
None
}
}

View File

@@ -1944,6 +1944,7 @@ pub(crate) async fn immediate_gc(
tenant_id: TenantId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
let guard = TENANTS.read().unwrap();
@@ -1970,7 +1971,7 @@ pub(crate) async fn immediate_gc(
async move {
fail::fail_point!("immediate_gc_task_pre");
let result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &ctx)
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.instrument(info_span!("manual_gc", %tenant_id, %timeline_id))
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it

View File

@@ -190,6 +190,7 @@ use chrono::{NaiveDateTime, Utc};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
pub(crate) use upload::upload_initdb_dir;
use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
@@ -249,6 +250,8 @@ pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
// retries. Uploads and deletions are retried forever, though.
pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
pub enum MaybeDeletedIndexPart {
IndexPart(IndexPart),
Deleted(IndexPart),
@@ -816,7 +819,7 @@ impl RemoteTimelineClient {
let mut receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier(upload_queue)
self.schedule_barrier0(upload_queue)
};
if receiver.changed().await.is_err() {
@@ -825,7 +828,14 @@ impl RemoteTimelineClient {
Ok(())
}
fn schedule_barrier(
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue);
Ok(())
}
fn schedule_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
) -> tokio::sync::watch::Receiver<()> {
@@ -1229,16 +1239,18 @@ impl RemoteTimelineClient {
}
res
}
UploadOp::Delete(delete) => self
.deletion_queue_client
.push_layers(
self.tenant_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e)),
UploadOp::Delete(delete) => {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
}
UploadOp::Barrier(_) => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks
@@ -1528,6 +1540,13 @@ pub fn remote_layer_path(
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
RemotePath::from_string(&format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
))
.expect("Failed to construct path")
}
pub fn remote_index_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,

View File

@@ -128,6 +128,14 @@ impl IndexPart {
pub fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn
}
pub fn from_s3_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<IndexPart>(bytes)
}
pub fn to_s3_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
}
impl TryFrom<&UploadQueueInitialized> for IndexPart {
@@ -201,7 +209,7 @@ mod tests {
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -239,7 +247,7 @@ mod tests {
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -279,7 +287,7 @@ mod tests {
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -323,7 +331,7 @@ mod tests {
deleted_at: None,
};
let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();
let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap();
assert_eq!(empty_layers_parsed, expected);
}
@@ -361,7 +369,7 @@ mod tests {
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
}

View File

@@ -1,6 +1,7 @@
//! Helper functions to upload files to remote storage with a RemoteStorage
use anyhow::{bail, Context};
use bytes::Bytes;
use camino::Utf8Path;
use fail::fail_point;
use std::io::ErrorKind;
@@ -9,7 +10,9 @@ use tokio::fs;
use super::Generation;
use crate::{
config::PageServerConf,
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
tenant::remote_timeline_client::{
index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
},
};
use remote_storage::GenericRemoteStorage;
use utils::id::{TenantId, TimelineId};
@@ -33,8 +36,9 @@ pub(super) async fn upload_index_part<'a>(
});
pausable_failpoint!("before-upload-index-pausable");
let index_part_bytes =
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
let index_part_bytes = index_part
.to_s3_bytes()
.context("serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
@@ -103,3 +107,22 @@ pub(super) async fn upload_timeline_layer<'a>(
Ok(())
}
/// Uploads the given `initdb` data to the remote storage.
pub(crate) async fn upload_initdb_dir(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
initdb_dir: Bytes,
) -> anyhow::Result<()> {
tracing::trace!("uploading initdb dir");
let size = initdb_dir.len();
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(initdb_dir));
let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
storage
.upload_storage_object(bytes, size, &remote_path)
.await
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
}

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
@@ -113,11 +114,12 @@ pub(super) async fn gather_inputs(
max_retention_period: Option<u64>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ModelInputs> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant
.refresh_gc_info(ctx)
.refresh_gc_info(cancel, ctx)
.await
.context("Failed to refresh gc_info before gathering inputs")?;

View File

@@ -289,7 +289,9 @@ impl DeltaLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, ctx).await?;
let loaded = DeltaLayerInner::load(&path, None, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
@@ -610,18 +612,28 @@ impl Drop for DeltaLayerWriter {
}
impl DeltaLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
/// - outer has the permanent failure
pub(super) async fn load(
path: &Utf8Path,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{path}'"))?;
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let summary_blk = match file.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
// TODO: this should be an assertion instead; see ImageLayerInner::load
let actual_summary =
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if let Some(mut expected_summary) = summary {
// production code path
@@ -636,11 +648,11 @@ impl DeltaLayerInner {
}
}
Ok(DeltaLayerInner {
Ok(Ok(DeltaLayerInner {
file,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
})
}))
}
pub(super) async fn get_value_reconstruct_data(

View File

@@ -249,7 +249,9 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx).await?;
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
@@ -295,18 +297,31 @@ impl ImageLayer {
}
impl ImageLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
/// - outer has the permanent failure
pub(super) async fn load(
path: &Utf8Path,
lsn: Lsn,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let summary_blk = match file.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
// length is the only way how this could fail, so it's not actually likely at all unless
// read_blk returns wrong sized block.
//
// TODO: confirm and make this into assertion
let actual_summary =
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if let Some(mut expected_summary) = summary {
// production code path
@@ -322,12 +337,12 @@ impl ImageLayerInner {
}
}
Ok(ImageLayerInner {
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn,
file,
})
}))
}
pub(super) async fn get_value_reconstruct_data(

View File

@@ -868,6 +868,9 @@ impl LayerInner {
}
Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive
//
// while we should not need this, this backoff has turned out to be useful with
// a bug of unexpectedly deleted remote layer file (#5787).
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
@@ -1196,7 +1199,7 @@ impl DownloadedLayer {
));
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
.await
.map(LayerKind::Delta)
.map(|res| res.map(LayerKind::Delta))
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
@@ -1207,23 +1210,32 @@ impl DownloadedLayer {
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
.await
.map(LayerKind::Image)
}
// this will be a permanent failure
.context("load layer");
.map(|res| res.map(LayerKind::Image))
};
if let Err(e) = res.as_ref() {
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
// TODO(#5815): we are not logging all errors, so temporarily log them here as well
tracing::error!("layer loading failed permanently: {e:#}");
match res {
Ok(Ok(layer)) => Ok(Ok(layer)),
Ok(Err(transient)) => Err(transient),
Err(permanent) => {
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
// TODO(#5815): we are not logging all errors, so temporarily log them **once**
// here as well
let permanent = permanent.context("load layer");
tracing::error!("layer loading failed permanently: {permanent:#}");
Ok(Err(permanent))
}
}
res
};
self.kind.get_or_init(init).await.as_ref().map_err(|e| {
// errors are not clonabled, cannot but stringify
// test_broken_timeline matches this string
anyhow::anyhow!("layer loading failed: {e:#}")
})
self.kind
.get_or_try_init(init)
// return transient errors using `?`
.await?
.as_ref()
.map_err(|e| {
// errors are not clonabled, cannot but stringify
// test_broken_timeline matches this string
anyhow::anyhow!("layer loading failed: {e:#}")
})
}
async fn get_value_reconstruct_data(

View File

@@ -180,16 +180,16 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count,
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Compaction failed {error_run_count} times, retrying in {:?}: {e:?}",
wait_duration
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
Duration::from_secs_f64(wait_duration)
wait_duration
} else {
error_run_count = 0;
period
@@ -198,6 +198,10 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
tenant.walredo_mgr.maybe_quiesce(period * 10);
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
@@ -257,20 +261,20 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
} else {
// Run gc
let res = tenant
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
.await;
if let Err(e) = res {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count,
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Gc failed {error_run_count} times, retrying in {:?}: {e:?}",
wait_duration
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
Duration::from_secs_f64(wait_duration)
wait_duration
} else {
error_run_count = 0;
period

View File

@@ -10,6 +10,7 @@ mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::models::{
@@ -437,6 +438,11 @@ pub enum LogicalSizeCalculationCause {
TenantSizeHandler,
}
#[derive(enumset::EnumSetType)]
pub(crate) enum CompactFlags {
ForceRepartition,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -694,6 +700,7 @@ impl Timeline {
pub(crate) async fn compact(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
// this wait probably never needs any "long time spent" logging, because we already nag if
@@ -766,6 +773,7 @@ impl Timeline {
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
flags,
ctx,
)
.await
@@ -1711,6 +1719,30 @@ impl Timeline {
if let Some(rtc) = self.remote_client.as_ref() {
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
rtc.schedule_index_upload_for_file_changes()?;
// This barrier orders above DELETEs before any later operations.
// This is critical because code executing after the barrier might
// create again objects with the same key that we just scheduled for deletion.
// For example, if we just scheduled deletion of an image layer "from the future",
// later compaction might run again and re-create the same image layer.
// "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
// "same" here means same key range and LSN.
//
// Without a barrier between above DELETEs and the re-creation's PUTs,
// the upload queue may execute the PUT first, then the DELETE.
// In our example, we will end up with an IndexPart referencing a non-existent object.
//
// 1. a future image layer is created and uploaded
// 2. ps restart
// 3. the future layer from (1) is deleted during load layer map
// 4. image layer is re-created and uploaded
// 5. deletion queue would like to delete (1) but actually deletes (4)
// 6. delete by name works as expected, but it now deletes the wrong (later) version
//
// See https://github.com/neondatabase/neon/issues/5878
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
rtc.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
}
@@ -2525,7 +2557,12 @@ impl Timeline {
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let (partitioning, _lsn) = self
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
.repartition(
self.initdb_lsn,
self.get_compaction_target_size(),
EnumSet::empty(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
@@ -2563,6 +2600,8 @@ impl Timeline {
)
};
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
@@ -2744,12 +2783,16 @@ impl Timeline {
&self,
lsn: Lsn,
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
{
let partitioning_guard = self.partitioning.lock().unwrap();
let distance = lsn.0 - partitioning_guard.1 .0;
if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
if partitioning_guard.1 != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
debug!(
distance,
threshold = self.repartition_threshold,
@@ -3497,21 +3540,22 @@ impl Timeline {
}
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
let mut layer_paths: Vec<Utf8PathBuf> = new_layers
let layer_paths: Vec<Utf8PathBuf> = new_layers
.iter()
.map(|l| l.local_path().to_owned())
.collect();
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
//
// FIXME: spawn_blocking above for this
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
par_fsync::par_fsync_async(&layer_paths)
.await
.context("fsync all new layers")?;
par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
let timeline_dir = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
par_fsync::par_fsync_async(&[timeline_dir])
.await
.context("fsync of timeline dir")?;
layer_paths.pop().unwrap();
}
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
@@ -3684,6 +3728,7 @@ impl Timeline {
retain_lsns: Vec<Lsn>,
cutoff_horizon: Lsn,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
@@ -3697,7 +3742,10 @@ impl Timeline {
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? {
match self
.find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
.await?
{
LsnForTimestamp::Present(lsn) => lsn,
LsnForTimestamp::Future(lsn) => {
// The timestamp is in the future. That sounds impossible,

View File

@@ -110,35 +110,6 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
Ok(())
}
// We delete local files first, so if pageserver restarts after local files deletion then remote deletion is not continued.
// This can be solved with inversion of these steps. But even if these steps are inverted then, when index_part.json
// gets deleted there is no way to distinguish between "this timeline is good, we just didnt upload it to remote"
// and "this timeline is deleted we should continue with removal of local state". So to avoid the ambiguity we use a mark file.
// After index part is deleted presence of this mark file indentifies that it was a deletion intention.
// So we can just remove the mark file.
async fn create_delete_mark(
conf: &PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), DeleteTimelineError> {
fail::fail_point!("timeline-delete-before-delete-mark", |_| {
Err(anyhow::anyhow!(
"failpoint: timeline-delete-before-delete-mark"
))?
});
let marker_path = conf.timeline_delete_mark_file_path(tenant_id, timeline_id);
// Note: we're ok to replace existing file.
let _ = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&marker_path)
.with_context(|| format!("could not create delete marker file {marker_path:?}"))?;
crashsafe::fsync_file_and_parent(&marker_path).context("sync_mark")?;
Ok(())
}
/// Grab the layer_removal_cs lock, and actually perform the deletion.
///
/// This lock prevents prevents GC or compaction from running at the same time.
@@ -311,6 +282,8 @@ async fn cleanup_remaining_timeline_fs_traces(
.context("fsync_pre_mark_remove")?;
// Remove delete mark
// TODO: once we are confident that no more exist in the field, remove this
// line. It cleans up a legacy marker file that might in rare cases be present.
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
.await
.or_else(fs_ext::ignore_not_found)
@@ -391,8 +364,6 @@ impl DeleteTimelineFlow {
set_deleted_in_remote_index(&timeline).await?;
create_delete_mark(tenant.conf, timeline.tenant_id, timeline.timeline_id).await?;
fail::fail_point!("timeline-delete-before-schedule", |_| {
Err(anyhow::anyhow!(
"failpoint: timeline-delete-before-schedule"
@@ -464,10 +435,6 @@ impl DeleteTimelineFlow {
guard.mark_in_progress()?;
// Note that delete mark can be missing on resume
// because we create delete mark after we set deleted_at in the index part.
create_delete_mark(tenant.conf, tenant.tenant_id, timeline_id).await?;
Self::schedule_background(guard, tenant.conf, tenant, timeline);
Ok(())

View File

@@ -351,7 +351,7 @@ impl Timeline {
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
_ => {
self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
.await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now());
}
@@ -417,8 +417,8 @@ impl Timeline {
async fn imitate_synthetic_size_calculation_worker(
&self,
tenant: &Arc<Tenant>,
ctx: &RequestContext,
cancel: &CancellationToken,
ctx: &RequestContext,
) {
if self.conf.metric_collection_endpoint.is_none() {
// We don't start the consumption metrics task if this is not set in the config.
@@ -457,6 +457,7 @@ impl Timeline {
None,
&mut throwaway_cache,
LogicalSizeCalculationCause::EvictionTaskImitation,
cancel,
ctx,
)
.instrument(info_span!("gather_inputs"));

View File

@@ -45,12 +45,20 @@ impl<'t> UninitializedTimeline<'t> {
let timeline_id = self.timeline_id;
let tenant_id = self.owning_tenant.tenant_id;
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
if self.raw_timeline.is_none() {
return Err(anyhow::anyhow!(
"No timeline for initialization found for {tenant_id}/{timeline_id}"
));
}
// Check that the caller initialized disk_consistent_lsn
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
let new_disk_consistent_lsn = self
.raw_timeline
.as_ref()
.expect("checked above")
.0
.get_disk_consistent_lsn();
anyhow::ensure!(
new_disk_consistent_lsn.is_valid(),
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
@@ -62,6 +70,13 @@ impl<'t> UninitializedTimeline<'t> {
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
),
Entry::Vacant(v) => {
// after taking here should be no fallible operations, because the drop guard will not
// cleanup after and would block for example the tenant deletion
let (new_timeline, uninit_mark) =
self.raw_timeline.take().expect("already checked");
// this is the mutual exclusion between different retries to create the timeline;
// this should be an assertion.
uninit_mark.remove_uninit_mark().with_context(|| {
format!(
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
@@ -70,10 +85,10 @@ impl<'t> UninitializedTimeline<'t> {
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
Ok(new_timeline)
}
}
Ok(new_timeline)
}
/// Prepares timeline data by loading it from the basebackup archive.

View File

@@ -91,6 +91,7 @@ struct ProcessOutput {
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
}
@@ -187,10 +188,26 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: RwLock::new(None),
}
}
/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
if let Ok(g) = self.last_redo_at.try_lock() {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
let mut guard = self.redo_process.write().unwrap();
*guard = None;
}
}
}
}
///
/// Process one request for WAL redo using wal-redo postgres
///
@@ -205,6 +222,8 @@ impl PostgresRedoManager {
wal_redo_timeout: Duration,
pg_version: u32,
) -> anyhow::Result<Bytes> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
@@ -348,12 +367,13 @@ impl PostgresRedoManager {
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
}
// Success!
let end_time = Instant::now();
let duration = end_time.duration_since(start_time);
let duration = start_time.elapsed();
// FIXME: using the same metric here creates a bimodal distribution by default, and because
// there could be multiple batch sizes this would be N+1 modal.
WAL_REDO_TIME.observe(duration.as_secs_f64());
debug!(
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
"neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
records.len(),
duration.as_micros(),
lsn

View File

@@ -20,7 +20,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

20
pgxn/neon/README.md Normal file
View File

@@ -0,0 +1,20 @@
neon extension consists of several parts:
### shared preload library `neon.so`
- implements storage manager API and network communications with remote page server.
- walproposer: implements broadcast protocol between postgres and WAL safekeepers.
- control plane connector: Captures updates to roles/databases using ProcessUtility_hook and sends them to the control ProcessUtility_hook.
- remote extension server: Request compute_ctl to download extension files.
- file_cache: Local file cache is used to temporary store relations pages in local file system for better performance.
- relsize_cache: Relation size cache for better neon performance.
### SQL functions in `neon--*.sql`
Utility functions to expose neon specific information to user and metrics collection.
This extension is created in all databases in the cluster by default.

View File

@@ -475,6 +475,12 @@ NeonXactCallback(XactEvent event, void *arg)
Assert(CurrentDdlTable == &RootTable);
}
static bool
RoleIsNeonSuperuser(const char *role_name)
{
return strcmp(role_name, "neon_superuser") == 0;
}
static void
HandleCreateDb(CreatedbStmt *stmt)
{
@@ -501,9 +507,16 @@ HandleCreateDb(CreatedbStmt *stmt)
entry->type = Op_Set;
if (downer && downer->arg)
entry->owner = get_role_oid(defGetString(downer), false);
{
const char *owner_name = defGetString(downer);
if (RoleIsNeonSuperuser(owner_name))
elog(ERROR, "can't create a database with owner neon_superuser");
entry->owner = get_role_oid(owner_name, false);
}
else
{
entry->owner = GetUserId();
}
}
static void
@@ -522,8 +535,10 @@ HandleAlterOwner(AlterOwnerStmt *stmt)
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->owner = get_role_oid(get_rolespec_name(stmt->newowner), false);
const char *new_owner = get_rolespec_name(stmt->newowner);
if (RoleIsNeonSuperuser(new_owner))
elog(ERROR, "can't alter owner to neon_superuser");
entry->owner = get_role_oid(new_owner, false);
entry->type = Op_Set;
}
@@ -617,6 +632,9 @@ HandleAlterRole(AlterRoleStmt *stmt)
InitRoleTableIfNeeded();
DefElem *dpass = NULL;
ListCell *option;
const char *role_name = stmt->role->rolename;
if (RoleIsNeonSuperuser(role_name))
elog(ERROR, "can't ALTER neon_superuser");
foreach(option, stmt->options)
{
@@ -631,7 +649,7 @@ HandleAlterRole(AlterRoleStmt *stmt)
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role->rolename,
role_name,
HASH_ENTER,
&found);

View File

@@ -32,11 +32,13 @@
#include "storage/latch.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "storage/fd.h"
#include "storage/pg_shmem.h"
#include "storage/buf_internals.h"
#include "pgstat.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -65,6 +67,7 @@
typedef struct FileCacheEntry
{
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 bitmap[BLOCKS_PER_CHUNK/32];
@@ -76,6 +79,10 @@ typedef struct FileCacheControl
uint64 generation; /* generation is needed to handle correct hash reenabling */
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
uint64 writes;
dlist_head lru; /* double linked list for LRU replacement algorithm */
} FileCacheControl;
@@ -91,10 +98,12 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
static shmem_request_hook_type prev_shmem_request_hook;
#endif
void FileCacheMonitorMain(Datum main_arg);
#define LFC_ENABLED() (lfc_ctl->limit != 0)
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
/*
* Local file cache is mandatory and Neon can work without it.
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
* Also we should allow re-enable it if source of failure (lack of disk space, permissions,...) is fixed.
* All cache content should be invalidated to avoid reading of stale or corrupted data
@@ -102,49 +111,77 @@ void FileCacheMonitorMain(Datum main_arg);
static void
lfc_disable(char const* op)
{
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
int fd;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (LFC_ENABLED())
{
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
if (lfc_desc > 0)
{
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
int rc = ftruncate(lfc_desc, 0);
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
}
}
/* We need to use unlink to to avoid races in LFC write, because it is not protectedby */
unlink(lfc_path);
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
if (fd < 0)
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);
LWLockRelease(lfc_lock);
if (lfc_desc > 0)
close(lfc_desc);
lfc_desc = -1;
lfc_size_limit = 0;
}
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
hash_seq_term(&status);
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru);
LWLockRelease(lfc_lock);
/*
* This check is done without obtaining lfc_lock, so it is unreliable
*/
static bool
lfc_maybe_disabled(void)
{
return !lfc_ctl || !LFC_ENABLED();
}
static bool
lfc_ensure_opened(void)
{
bool enabled = !lfc_maybe_disabled();
/* Open cache file if not done yet */
if (lfc_desc <= 0)
if (lfc_desc <= 0 && enabled)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
if (lfc_desc < 0) {
lfc_disable("open");
return false;
}
}
return true;
return enabled;
}
static void
@@ -163,6 +200,7 @@ lfc_shmem_startup(void)
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found)
{
int fd;
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
@@ -175,10 +213,23 @@ lfc_shmem_startup(void)
lfc_ctl->generation = 0;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
lfc_ctl->writes = 0;
dlist_init(&lfc_ctl->lru);
/* Remove file cache on restart */
(void)unlink(lfc_path);
/* Recreate file cache on restart */
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
{
close(fd);
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
}
}
LWLockRelease(AddinShmemInitLock);
}
@@ -195,6 +246,17 @@ lfc_shmem_request(void)
RequestNamedLWLockTranche("lfc_lock", 1);
}
static bool
is_normal_backend(void)
{
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
}
static bool
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
@@ -210,25 +272,15 @@ static void
lfc_change_limit_hook(int newval, void *extra)
{
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
if (!is_normal_backend())
return;
if (!lfc_ensure_opened())
return;
/* Open cache file if not done yet */
if (lfc_desc <= 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
return;
}
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
{
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
@@ -238,10 +290,12 @@ lfc_change_limit_hook(int newval, void *extra)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
elog(LOG, "Failed to punch hole in file: %m");
#endif
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
lfc_ctl->used -= 1;
}
lfc_ctl->limit = new_size;
elog(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);
}
@@ -255,6 +309,7 @@ lfc_init(void)
if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -315,10 +370,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
BufferTag tag;
FileCacheEntry* entry;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
bool found;
bool found = false;
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -327,8 +382,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
}
LWLockRelease(lfc_lock);
return found;
}
@@ -345,7 +403,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -355,6 +413,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found)
@@ -405,7 +470,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/*
* Try to read page from local cache.
* Returns true if page is found in local cache.
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache.
* In case of error local file cache is disabled (lfc->limit is set to zero).
*/
bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
@@ -420,7 +485,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint64 generation;
uint32 entry_offset;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
if (!lfc_ensure_opened())
@@ -432,10 +497,18 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return false;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{
/* Page is not cached */
lfc_ctl->misses += 1;
LWLockRelease(lfc_lock);
return false;
}
@@ -456,8 +529,11 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
Assert(LFC_ENABLED());
lfc_ctl->hits += 1;
Assert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
@@ -488,8 +564,10 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
bool found;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
uint64 generation;
uint32 entry_offset;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
if (!lfc_ensure_opened())
@@ -497,12 +575,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
CopyNRelFileInfoToBufTag(tag, rinfo);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (found)
@@ -521,13 +604,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
* we prefer not to complicate code and use second approach.
*/
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
elog(DEBUG2, "Swap file cache page");
}
else
@@ -536,27 +619,140 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
}
entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
lfc_ctl->writes += 1;
LWLockRelease(lfc_lock);
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry_offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
if (rc != BLCKSZ)
{
LWLockRelease(lfc_lock);
lfc_disable("write");
}
else
{
/* Place entry to the head of LRU list */
Assert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
Assert(LFC_ENABLED());
/* Place entry to the head of LRU list */
Assert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31));
}
entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31));
LWLockRelease(lfc_lock);
}
}
typedef struct
{
TupleDesc tupdesc;
} NeonGetStatsCtx;
#define NUM_NEON_GET_STATS_COLS 2
#define NUM_NEON_GET_STATS_ROWS 3
PG_FUNCTION_INFO_V1(neon_get_lfc_stats);
Datum
neon_get_lfc_stats(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
NeonGetStatsCtx* fctx;
MemoryContext oldcontext;
TupleDesc tupledesc;
Datum result;
HeapTuple tuple;
char const* key;
uint64 value;
Datum values[NUM_NEON_GET_STATS_COLS];
bool nulls[NUM_NEON_GET_STATS_COLS];
if (SRF_IS_FIRSTCALL())
{
funcctx = SRF_FIRSTCALL_INIT();
/* Switch context when allocating stuff to be used in later calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Create a user function context for cross-call persistence */
fctx = (NeonGetStatsCtx*) palloc(sizeof(NeonGetStatsCtx));
/* Construct a tuple descriptor for the result rows. */
tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS);
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "lfc_key",
TEXTOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "lfc_value",
INT8OID, -1, 0);
fctx->tupdesc = BlessTupleDesc(tupledesc);
funcctx->max_calls = NUM_NEON_GET_STATS_ROWS;
funcctx->user_fctx = fctx;
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
/* Get the saved state */
fctx = (NeonGetStatsCtx*) funcctx->user_fctx;
switch (funcctx->call_cntr)
{
case 0:
key = "file_cache_misses";
if (lfc_ctl)
value = lfc_ctl->misses;
break;
case 1:
key = "file_cache_hits";
if (lfc_ctl)
value = lfc_ctl->hits;
break;
case 2:
key = "file_cache_used";
if (lfc_ctl)
value = lfc_ctl->used;
break;
case 3:
key = "file_cache_writes";
if (lfc_ctl)
value = lfc_ctl->writes;
break;
default:
SRF_RETURN_DONE(funcctx);
}
values[0] = PointerGetDatum(cstring_to_text(key));
nulls[0] = false;
if (lfc_ctl)
{
nulls[1] = false;
values[1] = Int64GetDatum(value);
}
else
nulls[1] = true;
tuple = heap_form_tuple(fctx->tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
/*
* Function returning data from the local file cache
* relation node/tablespace/database/blocknum and access_counter
*/
PG_FUNCTION_INFO_V1(local_cache_pages);
/*
* Record structure holding the to be exposed cache data.
*/
@@ -580,11 +776,6 @@ typedef struct
LocalCachePagesRec *record;
} LocalCachePagesContext;
/*
* Function returning data from the local file cache
* relation node/tablespace/database/blocknum and access_counter
*/
PG_FUNCTION_INFO_V1(local_cache_pages);
#define NUM_LOCALCACHE_PAGES_ELEM 7
@@ -651,15 +842,20 @@ local_cache_pages(PG_FUNCTION_ARGS)
fctx->tupdesc = BlessTupleDesc(tupledesc);
LWLockAcquire(lfc_lock, LW_SHARED);
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
if (lfc_ctl)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK/32; i++)
n_pages += pg_popcount32(entry->bitmap[i]);
}
}
}
hash_seq_term(&status);
fctx->record = (LocalCachePagesRec *)
MemoryContextAllocHuge(CurrentMemoryContext,
sizeof(LocalCachePagesRec) * n_pages);
@@ -671,36 +867,35 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
/*
* Scan through all the buffers, saving the relevant fields in the
* fctx->record structure.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header
* locks, so the information of each buffer is self-consistent.
*/
n_pages = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
if (n_pages != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
/*
* Scan through all the cache entries, saving the relevant fields in the
* fctx->record structure.
*/
uint32 n = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].forknum = entry->key.forkNum;
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
fctx->record[n_pages].accesscount = entry->access_count;
n_pages += 1;
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
{
fctx->record[n].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
}
}
}
Assert(n_pages == n);
}
hash_seq_term(&status);
Assert(n_pages == funcctx->max_calls);
LWLockRelease(lfc_lock);
if (lfc_ctl)
LWLockRelease(lfc_lock);
}
funcctx = SRF_PERCALL_SETUP();

View File

@@ -0,0 +1,10 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.1'" to load this file. \quit
CREATE FUNCTION neon_get_lfc_stats()
RETURNS SETOF RECORD
AS 'MODULE_PATHNAME', 'neon_get_lfc_stats'
LANGUAGE C PARALLEL SAFE;
-- Create a view for convenient access.
CREATE VIEW neon_lfc_stats AS
SELECT P.* FROM neon_get_lfc_stats() AS P (lfc_key text, lfc_value bigint);

View File

@@ -1,4 +1,4 @@
# neon extension
comment = 'cloud storage for PostgreSQL'
default_version = '1.0'
default_version = '1.1'
module_pathname = '$libdir/neon'

View File

@@ -1687,9 +1687,9 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
(errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
errmsg("could not extend file because project size limit (%d MB) has been exceeded",
max_cluster_size),
errhint("This limit is defined by neon.max_cluster_size GUC")));
errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC")));
}
/*

View File

@@ -168,9 +168,18 @@ async fn task_main(
.instrument(tracing::info_span!("handle_client", ?session_id))
);
}
Some(Err(e)) = connections.join_next(), if !connections.is_empty() => {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
// This only counts for this loop and it will be enabled again on next `select!`.
//
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
// not get called again, even if there are more connections to remove.
Some(res) = connections.join_next() => {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
_ = cancellation_token.cancelled() => {

View File

@@ -248,6 +248,7 @@ impl ConnCfg {
// connect_raw() will not use TLS if sslmode is "disable"
let (client, connection) = self.0.connect_raw(stream, tls).await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let stream = connection.stream.into_inner();
info!(

View File

@@ -294,9 +294,18 @@ pub async fn task_main(
}),
);
}
Some(Err(e)) = connections.join_next(), if !connections.is_empty() => {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
// This only counts for this loop and it will be enabled again on next `select!`.
//
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
// not get called again, even if there are more connections to remove.
Some(res) = connections.join_next() => {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
_ = cancellation_token.cancelled() => {
@@ -514,7 +523,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)]
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
timeout: time::Duration,

View File

@@ -208,6 +208,10 @@ impl GlobalConnPool {
} else {
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
latency_timer.pool_hit();
latency_timer.success();
return Ok(Client::new(client, pool).await);
@@ -224,6 +228,12 @@ impl GlobalConnPool {
)
.await
};
if let Ok(client) = &new_client {
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
}
match &new_client {
// clear the hash. it's no longer valid
@@ -257,12 +267,11 @@ impl GlobalConnPool {
}
_ => {}
}
// new_client.map(|inner| Client::new(inner, pool).await)
Ok(Client::new(new_client?, pool).await)
let new_client = new_client?;
Ok(Client::new(new_client, pool).await)
}
fn put(&self, conn_info: &ConnInfo, client: ClientInner, pid: i32) -> anyhow::Result<()> {
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
let conn_id = client.conn_id;
// We want to hold this open while we return. This ensures that the pool can't close
@@ -306,9 +315,9 @@ impl GlobalConnPool {
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}, pid={pid}");
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}, pid={pid}");
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
Ok(())
@@ -385,7 +394,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
#[tracing::instrument(skip_all)]
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
@@ -452,6 +461,7 @@ async fn connect_to_compute_once(
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let (tx, mut rx) = tokio::sync::watch::channel(session);
@@ -519,22 +529,6 @@ struct ClientInner {
conn_id: uuid::Uuid,
}
impl ClientInner {
pub async fn get_pid(&mut self) -> anyhow::Result<i32> {
let rows = self.inner.query("select pg_backend_pid();", &[]).await?;
if rows.len() != 1 {
Err(anyhow::anyhow!(
"expected 1 row from pg_backend_pid(), got {}",
rows.len()
))
} else {
let pid = rows[0].get(0);
info!(%pid, "got pid");
Ok(pid)
}
}
}
impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
@@ -546,7 +540,6 @@ pub struct Client {
span: Span,
inner: Option<ClientInner>,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
pid: i32,
}
pub struct Discard<'a> {
@@ -556,12 +549,11 @@ pub struct Discard<'a> {
impl Client {
pub(self) async fn new(
mut inner: ClientInner,
inner: ClientInner,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
) -> Self {
Self {
conn_id: inner.conn_id,
pid: inner.get_pid().await.unwrap_or(-1),
inner: Some(inner),
span: Span::current(),
pool,
@@ -573,7 +565,6 @@ impl Client {
pool,
conn_id,
span: _,
pid: _,
} = self;
(
&mut inner
@@ -630,11 +621,10 @@ impl Drop for Client {
.expect("client inner should not be removed");
if let Some((conn_info, conn_pool)) = self.pool.take() {
let current_span = self.span.clone();
let pid = self.pid;
// return connection to the pool
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client, pid);
let _ = conn_pool.put(&conn_info, client);
});
}
}

View File

@@ -250,7 +250,7 @@ pub async fn handle(
Ok(response)
}
#[instrument(name = "sql-over-http", skip_all)]
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
async fn handle_inner(
request: Request<Body>,
sni_hostname: Option<String>,

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.73.0"
channel = "1.74.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -202,6 +202,7 @@ async fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
info!("version: {GIT_VERSION}");

View File

@@ -434,6 +434,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided

View File

@@ -41,7 +41,12 @@ from urllib3.util.retry import Retry
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.pageserver.allowed_errors import (
DEFAULT_PAGESERVER_ALLOWED_ERRORS,
scan_pageserver_log_for_errors,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.types import IndexPartDump
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
@@ -702,6 +707,7 @@ class NeonEnv:
self.port_distributor = config.port_distributor
self.s3_mock_server = config.mock_s3_server
self.neon_cli = NeonCli(env=self)
self.pagectl = Pagectl(env=self)
self.endpoints = EndpointFactory(self)
self.safekeepers: List[Safekeeper] = []
self.pageservers: List[NeonPageserver] = []
@@ -1222,6 +1228,7 @@ class NeonCli(AbstractNeonCli):
self,
new_branch_name: str,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
cmd = [
"timeline",
@@ -1234,6 +1241,9 @@ class NeonCli(AbstractNeonCli):
self.env.pg_version,
]
if timeline_id is not None:
cmd.extend(["--timeline-id", str(timeline_id)])
res = self.raw_cli(cmd)
res.check_returncode()
@@ -1558,6 +1568,20 @@ class ComputeCtl(AbstractNeonCli):
COMMAND = "compute_ctl"
class Pagectl(AbstractNeonCli):
"""
A typed wrapper around the `pagectl` utility CLI tool.
"""
COMMAND = "pagectl"
def dump_index_part(self, path: Path) -> IndexPartDump:
res = self.raw_cli(["index-part", "dump", str(path)])
res.check_returncode()
parsed = json.loads(res.stdout)
return IndexPartDump.from_json(parsed)
class NeonAttachmentService:
def __init__(self, env: NeonEnv):
self.env = env
@@ -1622,57 +1646,7 @@ class NeonPageserver(PgProtocol):
# env.pageserver.allowed_errors.append(".*could not open garage door.*")
#
# The entries in the list are regular experessions.
self.allowed_errors = [
# All tests print these, when starting up or shutting down
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
".*Shutdown task error: walreceiver connection handling failure.*",
".*wal_connection_manager.*tcp connect error: Connection refused.*",
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres connection error.*",
".*serving compute connection task.*exited with error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres query error.*",
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
".*Connection aborted: unexpected message from server*",
".*kill_and_wait_impl.*: wait successful.*",
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
# safekeeper connection can fail with this, in the window between timeline creation
# and streaming start
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
# Tests related to authentication and authorization print these
".*Error processing HTTP request: Forbidden",
# intentional failpoints
".*failpoint ",
# FIXME: These need investigation
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
# Tenant::delete_timeline() can cause any of the four following errors.
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed.*, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed.*, retrying in .*: queue is in state Stopped.*",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
".*took more than expected to complete.*",
# these can happen during shutdown, but it should not be a reason to fail a test
".*completed, took longer than expected.*",
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
# and it is not a failure of our code when it happens.
".*DeleteObjects.*We encountered an internal error. Please try again.*",
]
self.allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
@@ -1782,27 +1756,9 @@ class NeonPageserver(PgProtocol):
def assert_no_errors(self):
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors = []
while True:
line = logfile.readline()
if not line:
break
errors = scan_pageserver_log_for_errors(logfile, self.allowed_errors)
if error_or_warn.search(line):
# Is this a torn log line? This happens when force-killing a process and restarting
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
continue
# It's an ERROR or WARN. Is it in the allow-list?
for a in self.allowed_errors:
if re.match(a, line):
break
else:
errors.append(line)
for error in errors:
for _lineno, error in errors:
log.info(f"not allowed error: {error.strip()}")
assert not errors

View File

@@ -0,0 +1,116 @@
#! /usr/bin/env python3
import argparse
import re
import sys
from typing import Iterable, List, Tuple
def scan_pageserver_log_for_errors(
input: Iterable[str], allowed_errors: List[str]
) -> List[Tuple[int, str]]:
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors = []
for lineno, line in enumerate(input, start=1):
if len(line) == 0:
continue
if error_or_warn.search(line):
# Is this a torn log line? This happens when force-killing a process and restarting
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
continue
# It's an ERROR or WARN. Is it in the allow-list?
for a in allowed_errors:
if re.match(a, line):
break
else:
errors.append((lineno, line))
return errors
DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
# All tests print these, when starting up or shutting down
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
".*Shutdown task error: walreceiver connection handling failure.*",
".*wal_connection_manager.*tcp connect error: Connection refused.*",
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres connection error.*",
".*serving compute connection task.*exited with error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres query error.*",
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
".*Connection aborted: unexpected message from server*",
".*kill_and_wait_impl.*: wait successful.*",
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
# safekeeper connection can fail with this, in the window between timeline creation
# and streaming start
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
# Tests related to authentication and authorization print these
".*Error processing HTTP request: Forbidden",
# intentional failpoints
".*failpoint ",
# FIXME: These need investigation
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
# Tenant::delete_timeline() can cause any of the four following errors.
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
".*Compaction failed.*, retrying in .*: ShuttingDown",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
".*took more than expected to complete.*",
# these can happen during shutdown, but it should not be a reason to fail a test
".*completed, took longer than expected.*",
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
# and it is not a failure of our code when it happens.
".*DeleteObjects.*We encountered an internal error. Please try again.*",
)
def _check_allowed_errors(input):
allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
# add any test specifics here; cli parsing is not provided for the
# difficulty of copypasting regexes as arguments without any quoting
# errors.
errors = scan_pageserver_log_for_errors(input, allowed_errors)
for lineno, error in errors:
print(f"-:{lineno}: {error.strip()}", file=sys.stderr)
print(f"\n{len(errors)} not allowed errors", file=sys.stderr)
return errors
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="check input against pageserver global allowed_errors"
)
parser.add_argument(
"-i",
"--input",
type=argparse.FileType("r"),
default=sys.stdin,
help="Pageserver logs file. Reads from stdin if no file is provided.",
)
args = parser.parse_args()
errors = _check_allowed_errors(args.input)
sys.exit(len(errors) > 0)

View File

@@ -432,12 +432,18 @@ class PageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
def timeline_compact(
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
params=query,
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
@@ -466,12 +472,18 @@ class PageserverHttpClient(requests.Session):
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
def timeline_checkpoint(
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
params=query,
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)

View File

@@ -0,0 +1,146 @@
from dataclasses import dataclass
from typing import Any, Dict, Tuple, Union
from fixtures.types import KEY_MAX, KEY_MIN, Key, Lsn
@dataclass
class IndexLayerMetadata:
@classmethod
def from_json(cls, d: Dict[str, Any]):
return {}
@dataclass(frozen=True)
class ImageLayerFileName:
lsn: Lsn
key_start: Key
key_end: Key
def to_str(self):
ret = (
f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn.as_int():016X}"
)
assert self == parse_layer_file_name(ret)
return ret
@dataclass(frozen=True)
class DeltaLayerFileName:
lsn_start: Lsn
lsn_end: Lsn
key_start: Key
key_end: Key
def is_l0(self):
return self.key_start == KEY_MIN and self.key_end == KEY_MAX
def to_str(self):
ret = f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn_start.as_int():016X}-{self.lsn_end.as_int():016X}"
assert self == parse_layer_file_name(ret)
return ret
LayerFileName = Union[ImageLayerFileName, DeltaLayerFileName]
class InvalidFileName(Exception):
pass
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
parts = f_name.split("__")
if len(parts) != 2:
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
key_parts = parts[0].split("-")
if len(key_parts) != 2:
raise InvalidFileName(
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
)
try:
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
except ValueError as e:
raise InvalidFileName(f"conversion error: {f_name}") from e
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
parts = f_name.split("__")
if len(parts) != 2:
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
key_parts = parts[0].split("-")
if len(key_parts) != 2:
raise InvalidFileName(
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
)
lsn_parts = parts[1].split("-")
if len(lsn_parts) != 2:
raise InvalidFileName(
f"expecting two lsn parts separated by '--' in parts[1], got: {lsn_parts}"
)
try:
return (
int(key_parts[0], 16),
int(key_parts[1], 16),
int(lsn_parts[0], 16),
int(lsn_parts[1], 16),
)
except ValueError as e:
raise InvalidFileName(f"conversion error: {f_name}") from e
def parse_layer_file_name(file_name: str) -> LayerFileName:
try:
key_start, key_end, lsn = parse_image_layer(file_name)
return ImageLayerFileName(lsn=Lsn(lsn), key_start=Key(key_start), key_end=Key(key_end))
except InvalidFileName:
pass
try:
key_start, key_end, lsn_start, lsn_end = parse_delta_layer(file_name)
return DeltaLayerFileName(
lsn_start=Lsn(lsn_start),
lsn_end=Lsn(lsn_end),
key_start=Key(key_start),
key_end=Key(key_end),
)
except InvalidFileName:
pass
raise ValueError()
def is_future_layer(layer_file_name: LayerFileName, disk_consistent_lsn: Lsn):
"""
Determines if this layer file is considered to be in future meaning we will discard these
layers during timeline initialization from the given disk_consistent_lsn.
"""
if (
isinstance(layer_file_name, ImageLayerFileName)
and layer_file_name.lsn > disk_consistent_lsn
):
return True
elif (
isinstance(layer_file_name, DeltaLayerFileName)
and layer_file_name.lsn_end > disk_consistent_lsn + 1
):
return True
else:
return False
@dataclass
class IndexPartDump:
layer_metadata: Dict[LayerFileName, IndexLayerMetadata]
disk_consistent_lsn: Lsn
@classmethod
def from_json(cls, d: Dict[str, Any]) -> "IndexPartDump":
return IndexPartDump(
layer_metadata={
parse_layer_file_name(n): IndexLayerMetadata.from_json(v)
for n, v in d["layer_metadata"].items()
},
disk_consistent_lsn=Lsn(d["disk_consistent_lsn"]),
)

View File

@@ -12,6 +12,7 @@ import boto3
from mypy_boto3_s3 import S3Client
from fixtures.log_helper import log
from fixtures.pageserver.types import LayerFileName
from fixtures.types import TenantId, TimelineId
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
@@ -87,6 +88,11 @@ class LocalFsStorage:
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
def layer_path(
self, tenant_id: TenantId, timeline_id: TimelineId, layer_file_name: LayerFileName
):
return self.timeline_path(tenant_id, timeline_id) / layer_file_name.to_str()
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME

View File

@@ -1,4 +1,5 @@
import random
from dataclasses import dataclass
from functools import total_ordering
from typing import Any, Type, TypeVar, Union
@@ -36,6 +37,11 @@ class Lsn:
return NotImplemented
return self.lsn_int < other.lsn_int
def __gt__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
raise NotImplementedError
return self.lsn_int > other.lsn_int
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
return NotImplemented
@@ -47,9 +53,32 @@ class Lsn:
return NotImplemented
return self.lsn_int - other.lsn_int
def __add__(self, other: Union[int, "Lsn"]) -> "Lsn":
if isinstance(other, int):
return Lsn(self.lsn_int + other)
elif isinstance(other, Lsn):
return Lsn(self.lsn_int + other.lsn_int)
else:
raise NotImplementedError
def __hash__(self) -> int:
return hash(self.lsn_int)
def as_int(self) -> int:
return self.lsn_int
@dataclass(frozen=True)
class Key:
key_int: int
def as_int(self) -> int:
return self.key_int
KEY_MAX = Key(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF)
KEY_MIN = Key(0)
@total_ordering
class Id:

View File

@@ -6,7 +6,16 @@ import subprocess
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, TypeVar
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Tuple,
TypeVar,
)
from urllib.parse import urlencode
import allure
@@ -14,6 +23,10 @@ import zstandard
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.pageserver.types import (
parse_delta_layer,
parse_image_layer,
)
if TYPE_CHECKING:
from fixtures.neon_fixtures import PgBin
@@ -193,26 +206,6 @@ def get_timeline_dir_size(path: Path) -> int:
return sz
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
parts = f_name.split("__")
key_parts = parts[0].split("-")
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
parts = f_name.split("__")
key_parts = parts[0].split("-")
lsn_parts = parts[1].split("-")
return (
int(key_parts[0], 16),
int(key_parts[1], 16),
int(lsn_parts[0], 16),
int(lsn_parts[1], 16),
)
def get_scale_for_db(size_mb: int) -> int:
"""Returns pgbench scale factor for given target db size in MB.

View File

@@ -24,8 +24,6 @@ def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_
log.info("checks started")
with pg_cur(endpoint) as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
res = cur.fetchone()
max_replication_write_lag_bytes = res[0]
@@ -102,9 +100,13 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# Create a branch for us
env.neon_cli.create_branch("test_backpressure")
endpoint = env.endpoints.create_start(
endpoint = env.endpoints.create(
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
)
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
# which is needed for backpressure_lsns() to work
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
log.info("postgres is running on 'test_backpressure' branch")
# setup check thread

View File

@@ -46,7 +46,10 @@ from fixtures.utils import query_scalar
# Because the delta layer D covering lsn1 is corrupted, creating a branch
# starting from lsn1 should return an error as follows:
# could not find data for key ... at LSN ..., for request at LSN ...
def test_branch_and_gc(neon_simple_env: NeonEnv):
def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
if build_type == "debug":
pytest.skip("times out in debug builds")
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()

View File

@@ -114,6 +114,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
[
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
".*Timeline got dropped without initializing, cleaning its files.*",
".*Failed to load index_part from remote storage, failed creation?.*",
]
)
@@ -143,6 +144,58 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
), "pageserver should clean its temp timeline files on timeline creation failure"
def test_timeline_init_break_before_checkpoint_recreate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
env.pageserver.allowed_errors.extend(
[
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
".*Timeline got dropped without initializing, cleaning its files.*",
".*Failed to load index_part from remote storage, failed creation?.*",
]
)
tenant_id = env.initial_tenant
timelines_dir = env.pageserver.timeline_dir(tenant_id)
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]
# Some fixed timeline ID (like control plane does)
timeline_id = TimelineId("1080243c1f76fe3c5147266663c9860b")
# Introduce failpoint during timeline init (some intermediate files are on disk), before it's checkpointed.
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "return"))
with pytest.raises(Exception, match="before-checkpoint-new-timeline"):
_ = env.neon_cli.create_timeline(
"test_timeline_init_break_before_checkpoint", tenant_id, timeline_id
)
# Restart the page server
env.pageserver.restart(immediate=True)
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
assert (
new_tenant_timelines == old_tenant_timelines
), f"Pageserver after restart should ignore non-initialized timelines for tenant {tenant_id}"
timeline_dirs = [d for d in timelines_dir.iterdir()]
assert (
timeline_dirs == initial_timeline_dirs
), "pageserver should clean its temp timeline files on timeline creation failure"
# Disable the failpoint again
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "off"))
# creating the branch should have worked now
new_timeline_id = env.neon_cli.create_timeline(
"test_timeline_init_break_before_checkpoint", tenant_id, timeline_id
)
assert timeline_id == new_timeline_id
def test_timeline_create_break_after_uninit_mark(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -245,6 +245,19 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
raise AssertionError("Could not count databases")
assert result[0] == 0, "Database 'failure' still exists after drop"
# We don't have compute_ctl, so here, so create neon_superuser here manually
cur.execute("CREATE ROLE neon_superuser NOLOGIN CREATEDB CREATEROLE")
with pytest.raises(psycopg2.InternalError):
cur.execute("ALTER ROLE neon_superuser LOGIN")
with pytest.raises(psycopg2.InternalError):
cur.execute("CREATE DATABASE trololobus WITH OWNER neon_superuser")
cur.execute("CREATE DATABASE trololobus")
with pytest.raises(psycopg2.InternalError):
cur.execute("ALTER DATABASE trololobus OWNER TO neon_superuser")
conn.close()

View File

@@ -1,5 +1,6 @@
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -15,7 +16,11 @@ from fixtures.utils import query_scalar
# and then download them back.
def test_basic_eviction(
neon_env_builder: NeonEnvBuilder,
build_type: str,
):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(

View File

@@ -0,0 +1,222 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.types import (
DeltaLayerFileName,
ImageLayerFileName,
is_future_layer,
)
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload_queue_empty,
wait_until_tenant_active,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until
def test_issue_5878(neon_env_builder: NeonEnvBuilder):
"""
Regression test for issue https://github.com/neondatabase/neon/issues/5878 .
Create a situation where IndexPart contains an image layer from a future
(i.e., image layer > IndexPart::disk_consistent_lsn).
Detach.
Attach.
Wait for tenant to finish load_layer_map (by waiting for it to become active).
Wait for any remote timeline client ops to finish that the attach started.
Integrity-check the index part.
Before fixing the issue, load_layer_map would schedule removal of the future
image layer. A compaction run could later re-create the image layer with
the same file name, scheduling a PUT.
Due to lack of an upload queue barrier, the PUT and DELETE could be re-ordered.
The result was IndexPart referencing a non-existent object.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
l0_l1_threshold = 3
image_creation_threshold = 1
tenant_config = {
"gc_period": "0s", # disable GC (shouldn't matter for this test but still)
"compaction_period": "0s", # we want to control when compaction runs
"checkpoint_timeout": "24h", # something we won't reach
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
"image_creation_threshold": f"{image_creation_threshold}",
"compaction_threshold": f"{l0_l1_threshold}",
"compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
}
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=tenant_config)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
def get_index_part():
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
ip_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
return env.pagectl.dump_index_part(ip_path)
def get_future_layers():
ip = get_index_part()
future_layers = [
layer_file_name
for layer_file_name in ip.layer_metadata.keys()
if is_future_layer(layer_file_name, ip.disk_consistent_lsn)
]
return future_layers
assert len(get_future_layers()) == 0
current = get_index_part()
assert len(set(current.layer_metadata.keys())) == 1
layer_file_name = list(current.layer_metadata.keys())[0]
assert isinstance(layer_file_name, DeltaLayerFileName)
assert layer_file_name.is_l0(), f"{layer_file_name}"
log.info("force image layer creation in the future by writing some data into in-memory layer")
# Create a number of layers in the tenant
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
iters = l0_l1_threshold * image_creation_threshold
for i in range(0, iters):
cur.execute(
f"""
INSERT INTO foo
SELECT '{i}' || g
FROM generate_series(1, 10000) g
"""
)
last_record_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn)
# 0..iters-1: create a stack of delta layers
# iters: leave a non-empty in-memory layer which we'll use for image layer generation
if i < iters - 1:
ps_http.timeline_checkpoint(tenant_id, timeline_id, force_repartition=True)
assert (
len(
[
layer
for layer in ps_http.layer_map_info(
tenant_id, timeline_id
).historic_layers
if layer.kind == "Image"
]
)
== 0
)
endpoint.stop()
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
ip = get_index_part()
assert len(ip.layer_metadata.keys())
assert (
ip.disk_consistent_lsn < last_record_lsn
), "sanity check for what above loop is supposed to do"
# create the image layer from the future
ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True)
assert (
len(
[
layer
for layer in ps_http.layer_map_info(tenant_id, timeline_id).historic_layers
if layer.kind == "Image"
]
)
== 1
)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
future_layers = get_future_layers()
assert len(future_layers) == 1
future_layer = future_layers[0]
assert isinstance(future_layer, ImageLayerFileName)
assert future_layer.lsn == last_record_lsn
log.info(
f"got layer from the future: lsn={future_layer.lsn} disk_consistent_lsn={ip.disk_consistent_lsn} last_record_lsn={last_record_lsn}"
)
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
future_layer_path = env.pageserver_remote_storage.layer_path(
tenant_id, timeline_id, future_layer
)
log.info(f"future layer path: {future_layer_path}")
pre_stat = future_layer_path.stat()
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
# force removal of layers from the future
tenant_conf = ps_http.tenant_config(tenant_id)
ps_http.tenant_detach(tenant_id)
failpoint_name = "before-delete-layer-pausable"
ps_http.configure_failpoints((failpoint_name, "pause"))
ps_http.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides)
wait_until_tenant_active(ps_http, tenant_id)
# Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue.
def future_layer_is_gone_from_index_part():
future_layers = set(get_future_layers())
assert future_layer not in future_layers
wait_until(10, 0.5, future_layer_is_gone_from_index_part)
# NB: the layer file is unlinked index part now, but, because we made the delete
# operation stuck, the layer file itself is still in the remote_storage
def delete_at_pause_point():
assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}")
wait_until(10, 0.5, delete_at_pause_point)
assert future_layer_path.exists()
# wait for re-ingestion of the WAL from safekeepers into the in-memory layer
# (this happens in parallel to the above)
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn)
# re-do image layer generation
# This will produce the same image layer and queue an upload.
# However, we still have the deletion for the layer queued, stuck on the failpoint.
# An incorrect implementation would let the PUT execute before the DELETE.
# The later code in this test asserts that this doesn't happen.
ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True)
# Let things sit for some time; a good implementation makes no progress because
# we can't execute the PUT before the DELETE. A bad implementation would do that.
max_race_opportunity_window = 4
start = time.monotonic()
while True:
post_stat = future_layer_path.stat()
assert (
pre_stat.st_mtime == post_stat.st_mtime
), "observed PUT overtake the stucked DELETE => bug isn't fixed yet"
if time.monotonic() - start > max_race_opportunity_window:
log.info(
"a correct implementation would never let the later PUT overtake the earlier DELETE"
)
break
time.sleep(1)
# Window has passed, unstuck the delete, let upload queue drain.
log.info("unstuck the DELETE")
ps_http.configure_failpoints(("before-delete-layer-pausable", "off"))
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
# Examine the resulting S3 state.
log.info("integrity-check the remote storage")
ip = get_index_part()
for layer_file_name in ip.layer_metadata.keys():
layer_path = env.pageserver_remote_storage.layer_path(
tenant_id, timeline_id, layer_file_name
)
assert layer_path.exists(), f"{layer_file_name.to_str()}"
log.info("assert that the overwritten layer won")
final_stat = future_layer_path.stat()
assert final_stat.st_mtime != pre_stat.st_mtime

View File

@@ -0,0 +1,74 @@
import os
import random
import threading
import time
from typing import List
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
env = neon_simple_env
cache_dir = os.path.join(env.repo_dir, "file_cache")
os.mkdir(cache_dir)
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
endpoint = env.endpoints.create_start(
"test_local_file_cache_unlink",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",
"neon.max_file_cache_size='64MB'",
"neon.file_cache_size_limit='10MB'",
],
)
cur = endpoint.connect().cursor()
n_rows = 100000
n_threads = 20
n_updates_per_thread = 10000
n_updates_per_connection = 1000
n_total_updates = n_threads * n_updates_per_thread
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates():
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
for _ in range(n_updates_per_thread):
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
threads: List[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(), daemon=True)
thread.start()
threads.append(thread)
time.sleep(5)
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
os.rename(cache_dir, new_cache_dir)
for thread in threads:
thread.join()
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows

View File

@@ -0,0 +1,28 @@
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
# Verify that the neon extension is installed and has the correct version.
def test_neon_extension(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_create_extension_neon")
endpoint_main = env.endpoints.create("test_create_extension_neon")
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
log.info("postgres is running on 'test_create_extension_neon' branch")
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT extversion from pg_extension where extname='neon'")
# If this fails, it means the extension is either not installed
# or was updated and the version is different.
#
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.1",)

View File

@@ -144,7 +144,10 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
# Test that repeatedly kills and restarts the page server, while the
# safekeeper and compute node keep running.
@pytest.mark.timeout(540)
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()

View File

@@ -1,6 +1,7 @@
import enum
import os
import shutil
from threading import Thread
import pytest
from fixtures.log_helper import log
@@ -27,7 +28,7 @@ from fixtures.remote_storage import (
available_s3_storages,
)
from fixtures.types import TenantId
from fixtures.utils import run_pg_bench_small
from fixtures.utils import run_pg_bench_small, wait_until
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@@ -399,4 +400,78 @@ def test_tenant_delete_is_resumed_on_attach(
)
def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder):
"""Reproduction of 2023-11-23 stuck tenants investigation"""
# do not use default tenant/timeline creation because it would output the failpoint log message too early
env = neon_env_builder.init_configs()
env.start()
pageserver_http = env.pageserver.http_client()
# happens with the cancellation bailing flushing loop earlier, leaving disk_consistent_lsn at zero
env.pageserver.allowed_errors.append(
".*Timeline got dropped without initializing, cleaning its files"
)
# the response hit_pausable_failpoint_and_later_fail
env.pageserver.allowed_errors.append(
f".*Error processing HTTP request: InternalServerError\\(new timeline {env.initial_tenant}/{env.initial_timeline} has invalid disk_consistent_lsn"
)
pageserver_http.tenant_create(env.initial_tenant)
failpoint = "flush-layer-cancel-after-writing-layer-out-pausable"
pageserver_http.configure_failpoints((failpoint, "pause"))
def hit_pausable_failpoint_and_later_fail():
with pytest.raises(
PageserverApiException, match="new timeline \\S+ has invalid disk_consistent_lsn"
):
pageserver_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline
)
def start_deletion():
pageserver_http.tenant_delete(env.initial_tenant)
def has_hit_failpoint():
assert env.pageserver.log_contains(f"at failpoint {failpoint}") is not None
def deletion_has_started_waiting_for_timelines():
assert env.pageserver.log_contains("Waiting for timelines...") is not None
def tenant_is_deleted():
try:
pageserver_http.tenant_status(env.initial_tenant)
except PageserverApiException as e:
assert e.status_code == 404
else:
raise RuntimeError("tenant was still accessible")
creation = Thread(target=hit_pausable_failpoint_and_later_fail)
creation.start()
deletion = None
try:
wait_until(10, 1, has_hit_failpoint)
# it should start ok, sync up with the stuck creation, then fail because disk_consistent_lsn was not updated
# then deletion should fail and set the tenant broken
deletion = Thread(target=start_deletion)
deletion.start()
wait_until(10, 1, deletion_has_started_waiting_for_timelines)
pageserver_http.configure_failpoints((failpoint, "off"))
creation.join()
deletion.join()
wait_until(10, 1, tenant_is_deleted)
finally:
creation.join()
if deletion is not None:
deletion.join()
# TODO test concurrent deletions with "hang" failpoint

View File

@@ -307,7 +307,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
)
gc_thread = Thread(target=lambda: do_gc_target(pageserver_http, tenant_id, timeline_id))
gc_thread.start()
time.sleep(1)
time.sleep(5)
# By now the gc task is spawned but in sleep for another second due to the failpoint.
log.info("detaching tenant")

View File

@@ -134,10 +134,11 @@ def wait_for_pageserver_catchup(endpoint_main: Endpoint, polling_interval=1, tim
res = endpoint_main.safe_psql(
"""
SELECT
pg_size_pretty(pg_cluster_size()),
pg_size_pretty(neon.pg_cluster_size()),
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM backpressure_lsns();
"""
FROM neon.backpressure_lsns();
""",
dbname="postgres",
)[0]
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
received_lsn_lag = res[1]
@@ -152,17 +153,20 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
endpoint_main = env.endpoints.create_start(
endpoint_main = env.endpoints.create(
"test_timeline_size_quota",
# Set small limit for the test
config_lines=["neon.max_cluster_size=30MB"],
)
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
# which is needed for pg_cluster_size() to work
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
log.info("postgres is running on 'test_timeline_size_quota' branch")
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("CREATE TABLE foo (t text)")
wait_for_pageserver_catchup(endpoint_main)
@@ -211,7 +215,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
wait_for_pageserver_catchup(endpoint_main)
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
cur.execute("SELECT * from pg_size_pretty(neon.pg_cluster_size())")
pg_cluster_size = cur.fetchone()
log.info(f"pg_cluster_size = {pg_cluster_size}")

View File

@@ -602,7 +602,10 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
# The test takes more than default 5 minutes on Postgres 16,
# see https://github.com/neondatabase/neon/issues/5305
@pytest.mark.timeout(600)
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path, build_type: str):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

View File

@@ -1,14 +1,19 @@
import sys
import tarfile
import tempfile
from pathlib import Path
import pytest
import zstandard
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
VanillaPostgres,
)
from fixtures.port_distributor import PortDistributor
from fixtures.types import TenantId, TimelineId
from fixtures.remote_storage import LocalFsStorage
from fixtures.types import Lsn, TenantId, TimelineId
@pytest.mark.skipif(
@@ -53,3 +58,70 @@ def test_wal_restore(
)
restored.start()
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
def decompress_zstd(
input_file_name: Path,
output_dir: Path,
):
log.info(f"decompressing zstd to: {output_dir}")
output_dir.mkdir(mode=0o750, parents=True, exist_ok=True)
with tempfile.TemporaryFile(suffix=".tar") as temp:
decompressor = zstandard.ZstdDecompressor()
with open(input_file_name, "rb") as input_file:
decompressor.copy_stream(input_file, temp)
temp.seek(0)
with tarfile.open(fileobj=temp) as tfile:
tfile.extractall(path=output_dir)
def test_wal_restore_initdb(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
test_output_dir: Path,
port_distributor: PortDistributor,
base_dir: Path,
pg_distrib_dir: Path,
):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("create table t as select generate_series(1,300000)")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
original_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
env.pageserver.stop()
port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored"
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
initdb_zst_path = (
env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / "initdb.tar.zst"
)
decompress_zstd(initdb_zst_path, data_dir)
with VanillaPostgres(
data_dir, PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version), port, init=False
) as restored:
pg_bin.run_capture(
[
str(base_dir / "libs" / "utils" / "scripts" / "restore_from_wal_initdb.sh"),
str(pg_distrib_dir / f"v{env.pg_version}/bin"),
str(
test_output_dir
/ "repo"
/ "safekeepers"
/ "sk1"
/ str(tenant_id)
/ str(timeline_id)
),
str(data_dir),
str(port),
]
)
restored.start()
restored_lsn = Lsn(
restored.safe_psql("SELECT pg_current_wal_flush_lsn()", user="cloud_admin")[0][0]
)
log.info(f"original lsn: {original_lsn}, restored lsn: {restored_lsn}")
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]

View File

@@ -4,15 +4,15 @@ commands:
- name: cgconfigparser
user: root
sysvInitAction: sysinit
shell: "cgconfigparser -l /etc/cgconfig.conf -s 1664"
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
- name: pgbouncer
user: nobody
sysvInitAction: respawn
shell: "/usr/local/bin/pgbouncer /etc/pgbouncer.ini"
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter --extend.query-path /etc/postgres_exporter_queries.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
@@ -46,21 +46,6 @@ files:
}
memory {}
}
- filename: postgres_exporter_queries.yml
content: |
postgres_exporter_pg_database_size:
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as bytes, 42 as fourtytwo FROM pg_database"
cache_seconds: 30
metrics:
- datname:
usage: "LABEL"
description: "Name of the database"
- bytes:
usage: "GAUGE"
description: "Disk space used by the database"
- fourtytwo:
usage: "GAUGE"
description: "fourtytwo"
build: |
# Build cgroup-tools
#
@@ -129,12 +114,10 @@ merge: |
COPY cgconfig.conf /etc/cgconfig.conf
COPY pgbouncer.ini /etc/pgbouncer.ini
COPY postgres_exporter_queries.yml /etc/postgres_exporter_queries.yml
RUN set -e \
&& chown postgres:postgres /etc/pgbouncer.ini \
&& chmod 0644 /etc/pgbouncer.ini \
&& chmod 0644 /etc/cgconfig.conf \
&& chmod 0644 /etc/postgres_exporter_queries.yml
&& chmod 0644 /etc/cgconfig.conf
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/

View File

@@ -68,6 +68,9 @@ tracing-core = { version = "0.1" }
tungstenite = { version = "0.20" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4"] }
zstd = { version = "0.12" }
zstd-safe = { version = "6", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
[build-dependencies]
anyhow = { version = "1", features = ["backtrace"] }