Compare commits

..

28 Commits

Author SHA1 Message Date
Vlad Lazar
11a97bbfd6 pageserver: handle rel drops correctly in rel size cache 2025-05-14 17:09:01 +02:00
Alex Chi Z.
81fd652151 fix(pageserver): use better estimation for compaction memory usage (#11904)
## Problem

Hopefully resolves `test_gc_feedback` flakiness.

## Summary of changes

`accumulated_values` should not exceed 512MB to avoid OOM. Previously we
only use number of items, which is not a good estimation.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-14 08:32:55 +00:00
Elizabeth Murray
d47e88e353 Update the pgrag version in the compute dockerfile. (#11867)
## Problem

The extensions test are hanging because of pgrag. The new version of
pgrag contains a fix for the hang.

## Summary of changes
2025-05-14 07:00:59 +00:00
Vlad Lazar
045ae13e06 pageserver: make imports work with tenant shut downs (#11855)
## Problem

Lifetime of imported timelines (and implicitly the import background
task) has some shortcomings:
1. Timeline activation upon import completion is tricky. Previously, a
timeline that finished importing
after a tenant detach would not get activated and there's concerns about
the safety of activating
concurrently with shut-down.
2. Import jobs can prevent tenant shut down since they hold the tenant
gate

## Summary of Changes

Track the import tasks in memory and abort them explicitly on tenant
shutdown.

Integrate more closely with the storage controller:
1. When an import task has finished all of its jobs, it notifies the
storage controller, but **does not** mark the import as done in the
index_part. When all shards have finished importing, the storage
controller will call the `/activate_post_import` idempotent endpoint for
all of them. The handler, marks the import complete in index part,
resets the tenant if required and checks if the timeline is active yet.
2. Not directly related, but the import job now gets the starting state
from the storage controller instead of the import bucket. This paves the
way for progress checkpointing.

Related: https://github.com/neondatabase/neon/issues/11568
2025-05-13 17:49:49 +00:00
Folke Behrens
234c882a07 proxy: Expose handlers for cpu and heap profiling (#11912)
## Problem

It's difficult to understand where proxy spends most of cpu and memory.

## Summary of changes

Expose cpu and heap profiling handlers for continuous profiling.

neondatabase/cloud#22670
2025-05-13 14:58:37 +00:00
Konstantin Knizhnik
290369061f Check prefetch result in DEBUG_COMPARE_LOCAL mode (#11502)
## Problem

Prefetched and LFC results are not checked in DEBUG_COMPARE_LOCAL mode

## Summary of changes

Add check for this results as well.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-13 14:13:42 +00:00
Anastasia Lubennikova
25ab16ee24 chore(compute): Postgres 17.5, 16.9, 15.13 and 14.18 (#11886)
Bump all minor versions. 

the only conflict was
src/backend/storage/smgr/smgr.c in v17
where our smgr changes conflicted with

ee578921b6
but it was trivial to resolve.
2025-05-13 13:30:09 +00:00
Vlad Lazar
cfbef4d586 safekeeper: downgrade stream from future WAL log (#11909)
## Problem

1. Safekeeper selection on the pageserver side isn't very dynamic. Once
you connect to one safekeeper, you'll use that one for as long as the
safekeeper keeps the connection alive. In principle, we could be more
eager, since the wal receiver connection can be cancelled but we don't
do that. We wait until the "session" is done and then we pick a new SK.
2. Picking a new SK is quite conservative. We will switch if: 
a. We haven't received anything from the SK within the last 10 seconds
(wal_connect_timeout) or
b. The candidate SK is 1GiB ahead or
c. The candidate SK is in the same AZ as the PS or d. There's a
candidate that is ahead and we've not had any WAL within the last 10
seconds (lagging_wal_timeout)

Hence, we can end up with pageservers that are requesting WAL which
their safekeeper hasn't seen yet.

## Summary of changes

Downgrade warning log to info.
2025-05-13 13:02:25 +00:00
Alex Chi Z.
34a42b00ca feat(pageserver): add PostHog lite client (#11821)
## Problem

part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

Add a lite PostHog client that only uses the local flag evaluation
functionality. Added a test case that parses an example feature flag and
gets the evaluation result.

TODO: support boolean flag, remote config; implement all operators in
PostHog.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-13 09:49:14 +00:00
Alex Chi Z.
a9979620c5 fix(remote_storage): continue on Azure+AWS retryable error (#11903)
## Problem

We implemented the retry logic in AWS S3 but not in Azure. Therefore, if
there is an error during Azure listing, we will return an Err to the
caller, and the stream will end without fetching more tenants.

Part of https://github.com/neondatabase/neon/issues/11159

Without this fix, listing tenant will stop once we hit an error (could
be network errors -- that happens more frequent on Azure). If we happen
to stop at a point that we only listed part of the shards, we will hit
the "missed shards" error or even remove layers being used.

This bug (for Azure listing) was introduced as part of
https://github.com/neondatabase/neon/pull/9840

There is also a bug that stops the stream for AWS when there's a timeout
-- this is fixed along with this patch.

## Summary of changes

Retry the request on error. In the future, we should make such streams
return something like `Result<Result<T>>` where the outer result is the
error that ends the stream and the inner one is the error that should be
retried by the caller.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-13 08:53:35 +00:00
Conrad Ludgate
a113c48c43 proxy: fix redis batching support (#11905)
## Problem

For `StoreCancelKey`, we were inserting 2 commands, but we were not
inserting two replies. This mismatch leads to errors when decoding the
response.

## Summary of changes

Abstract the command + reply pipeline so that commands and replies are
registered at the same time.
2025-05-13 08:33:53 +00:00
Tristan Partin
9971fba584 Properly configure the dynamic loader to load our compiled libraries (#11858)
The first line in /etc/ld.so.conf is:

	/etc/ld.so.conf.d/*

We want to control library load order so that our compiled binaries are
picked up before others from system packages. The previous solution
allowed the system libraries to load before ours.

Part-of: https://github.com/neondatabase/neon/issues/11857

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-12 17:36:07 +00:00
Conrad Ludgate
a77919f4b2 merge pg-sni-router into proxy (#11882)
## Problem

We realised that pg-sni-router doesn't need to be separate from proxy.
just a separate port.

## Summary of changes

Add pg-sni-router config to proxy and expose the service.
2025-05-12 15:48:48 +00:00
Jakub Kołodziejczak
a618056770 chore(compute): skip audit logs for pg_session_jwt extension (#11883)
references
https://github.com/neondatabase/cloud/issues/28480#issuecomment-2866961124

related https://github.com/neondatabase/cloud/issues/28863

cc @MihaiBojin @conradludgate
2025-05-12 11:24:33 +00:00
Alex Chi Z.
307e1e64c8 fix(scrubber): more logs wrt relic timelines (#11895)
## Problem

Further investigation on
https://github.com/neondatabase/neon/issues/11159 reveals that the
list_tenant function can find all the shards of the tenant, but then the
shard gets missing during the gc timeline list blob. One reason could be
that in some ways the timeline gets recognized as a relic timeline.

## Summary of changes

Add logging to help identify the issue.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-12 09:17:35 +00:00
Arpad Müller
a537b2ffd0 pull_timeline: check tombstones by default (#11873)
Make `pull_timeline` check tombstones by default. Otherwise, we'd be
recreating timelines if the order between creation and deletion got
mixed up, as seen in #11838.

Fixes #11838.
2025-05-12 07:25:54 +00:00
Christian Schwarz
64353b48db direct+concurrent IO: retroactive RFC (#11788)
refs
- direct IO epic: https://github.com/neondatabase/neon/issues/8130
- concurrent IO epic https://github.com/neondatabase/neon/issues/9378
- obsoletes direct IO proposal RFC:
https://github.com/neondatabase/neon/pull/8240
- discussion in
https://neondb.slack.com/archives/C07BZ38E6SD/p1746028030574349
2025-05-10 15:06:06 +00:00
Christian Schwarz
79ddc803af feat(direct IO): runtime alignment validation; support config flag on macOS; default to DirectRw (#11868)
This PR adds a runtime validation mode to check adherence to alignment
and size-multiple requirements at the VirtualFile level.

This can help prevent alignment bugs from slipping into production
because test systems may have more lax requirements than production.
(This is not the case today, but it could change in the future).

It also allows catching O_DIRECT bugs on systems that don't have
O_DIRECT (macOS).
Consequently, we can now accept
`virtual_file_io_mode={direct,direct-rw}` on macOS now.
This has the side benefit of removing some annoying conditional
compilation around `IoMode`.

A third benefit is that it helped weed out size-multiple requirement
violation bugs in how the VirtualFile unit tests exercise read and write
APIs.
I seized the opportunity to trim these tests down to what actually
matters, i.e., exercising of the `OpenFiles` file descriptor cache.

Lastly, this PR flips the binary-built-in default to `DirectRw` so that
when running Python regress tests and benchmarks without specifying
`PAGESERVER_VIRTUAL_FILE_IO_MODE`, one gets the production behavior.

Refs
- fixes https://github.com/neondatabase/neon/issues/11676
2025-05-10 14:19:52 +00:00
Christian Schwarz
f5070f6aa4 fixup(direct IO): PR #11864 broke test suite parametrization (#11887)
PR
- github.com/neondatabase/neon/pull/11864

committed yesterday rendered the `PAGESERVER_VIRTUAL_FILE_IO_MODE`
env-var-based parametrization ineffective.

As a consequence, the tests and benchmarks in `test_runner/` were using
the binary built-in-default, i.e., `buffered`.
2025-05-09 18:13:35 +00:00
Matthias van de Meent
3b7cc4234c Fix PS connect attempt timeouts when facing interrupts (#11880)
With the 50ms timeouts of pumping state in connector.c, we need to
correctly handle these timeouts that also wake up pg_usleep.

This new approach makes the connection attempts re-start the wait
whenever it gets woken up early; and CHECK_FOR_INTERRUPTS() is called to
make sure we don't miss query cancellations.

## Problem

https://neondb.slack.com/archives/C04DGM6SMTM/p1746794528680269

## Summary of changes

Make sure we start sleeping again if pg_usleep got woken up ahead of
time.
2025-05-09 17:02:24 +00:00
Arpad Müller
33abfc2b74 storcon: remove finished safekeeper reconciliations from in-memory hashmap (#11876)
## Problem

Currently there is a memory leak, in that finished safekeeper
reconciliations leave a cancellation token behind which is never cleaned
up.

## Summary of changes

The change adds cleanup after finishing of a reconciliation. In order to
ensure we remove the correct cancellation token, and we haven't raced
with another reconciliation, we introduce a `TokenId` counter to tell
tokens apart.

Part of https://github.com/neondatabase/neon/issues/11670
2025-05-09 13:34:22 +00:00
Alex Chi Z.
93b964f829 fix(pageserver): do not do image compaction if it's below gc cutoff (#11872)
## Problem

We observe image compaction errors after gc-compaction finishes
compacting below the gc_cutoff. This is because `repartition` returns an
LSN below the gc horizon as we (likely) determined that `distance <=
self.repartition_threshold`.

I think it's better to keep the current behavior of when to trigger
compaction but we should skip image compaction if the returned LSN is
below the gc horizon.

## Summary of changes

If the repartition returns an invalid LSN, skip image compaction.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-09 12:07:52 +00:00
Vlad Lazar
d0aaec2abb storage_controller: create imported timelines on safekeepers (#11801)
## Problem

SK timeline creations were skipped for imported timelines since we
didn't know the correct start LSN
of the timeline at that point.

## Summary of changes

Created imported timelines on the SK as part of the import finalize
step.
We use the last record LSN of shard 0 as the start LSN for the
safekeeper timeline.

Closes https://github.com/neondatabase/neon/issues/11569
2025-05-09 10:55:26 +00:00
Alex Chi Z.
d0dc65da12 fix(pageserver): give up gc-compaction if one key has too long history (#11869)
## Problem

The limitation we imposed last week
https://github.com/neondatabase/neon/pull/11709 is not enough to protect
excessive memory usage.

## Summary of changes

If a single key accumulated too much history, give up compaction. In the
future, we can make the `generate_key_retention` function take a stream
of keys instead of first accumulating them in memory, thus easily
support such long key history cases.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-09 10:12:49 +00:00
Konstantin Knizhnik
03d635b916 Add more guards for prefetch_pump_state (#11859)
## Problem

See https://neondb.slack.com/archives/C08PJ07BZ44/p1746566292750689

Looks like there are more cases when `prefetch_pump_state` can be called
in unexpected place and cause core dump.

## Summary of changes

Add more guards.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-09 09:07:08 +00:00
Conrad Ludgate
5cd7f936f9 fix(neon-rls): optimistically assume role grants are already assigned for replicas (#11811)
## Problem

Read replicas cannot grant permissions for roles for Neon RLS. Usually
the permission is already granted, so we can optimistically check. See
INC-509

## Summary of changes

Perform a permission lookup prior to actually executing any grants.
2025-05-09 07:48:30 +00:00
Konstantin Knizhnik
101e115b38 Change prefetch logic in vacuum (#11650)
## Problem
See https://neondb.slack.com/archives/C03QLRH7PPD/p1745003314183649

Vacuum doesn't use prefetch because this strange logic in
`lazy_scan_heap`:

```
			/* And only up to the next unskippable block */
			if (next_prefetch_block + prefetch_budget > vacrel->next_unskippable_block)
				prefetch_budget = vacrel->next_unskippable_block - next_prefetch_block;
```
## Summary of changes

Disable prefetch only if vacuum jumps to next skippable block (there is
SKIP_PAGES_THRESHOLD) which cancel seqscan and perform jump only if gap
is large enough).


Postgres PRs:
https://github.com/neondatabase/postgres/pull/620
https://github.com/neondatabase/postgres/pull/621
https://github.com/neondatabase/postgres/pull/622
https://github.com/neondatabase/postgres/pull/623

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-09 06:54:40 +00:00
Christian Schwarz
b37bb7d7ed pageserver: timeline shutdown: fully quiesce ingest path beforefreeze_and_flush (#11851)
# Problem 

Before this PR, timeline shutdown would
- cancel the walreceiver cancellation token subtree (child token of
Timeline::cancel)
- call freeze_and_flush
- Timeline::cancel.cancel()
- ... bunch of waiting for things ...
- Timeline::gate.close()

As noted by the comment that is deleted by this PR, this left a window
where, after freeze_and_flush, walreceiver could still be running and
ingest data into a new InMemoryLayer.

This presents a potential source of log noise during Timeline shutdown
where the InMemoryLayer created after the freeze_and_flush observes
that Timeline::cancel is cancelled, failing the ingest with some
anyhow::Error wrapping (deeply) a `FlushTaskError::Cancelled` instance
(`flush task cancelled` error message).

# Solution

It turns out that it is quite easy to shut down, not just cancel,
walreceiver completely
because the only subtask spawned by walreceiver connection manager is
the `handle_walreceiver_connection` task, which is properly shut down
and waited upon when the manager task observes cancellation and exits
its retry loop.

The alternative is to replace all the usage of `anyhow` on the ingest
path
with differentiated error types. A lot of busywork for little gain to
fix
a potential logging noise nuisance, so, not doing that for now.

# Correctness / Risk

We do not risk leaking walreceiver child tasks because existing
discipline
is to hold a gate guard.

We will prolong `Timeline::shutdown` to the degree that we're no longer
making
progress with the rest of shutdown while the walreceiver task hasn't yet
observed cancellation. In practice, this should be negligible.

`Timeline::shutdown` could fail to complete if there is a hidden
dependency
of walreceiver shutdown on some subsystem. The code certainly suggests
there
isn't, and I'm not aware of any such dependency. Anyway, impact will be
low
because we only shut down Timeline instances that are obsolete, either
because
there is a newer attachment at a different location, or because the
timeline
got deleted by the user. We would learn about this through stuck cplane
operations or stuck storcon reconciliations. We would be able to
mitigate by
cancelling such stuck operations/reconciliations and/or by rolling back
pageserver.

# Refs
- identified this while investigating
https://github.com/neondatabase/neon/issues/11762
- PR that _does_ fix a bunch _real_ `flush task cancelled` noise on the
compaction path: https://github.com/neondatabase/neon/pull/11853
2025-05-08 18:48:24 +00:00
79 changed files with 3335 additions and 1221 deletions

21
Cargo.lock generated
View File

@@ -1303,6 +1303,7 @@ dependencies = [
"futures",
"http 1.1.0",
"indexmap 2.0.1",
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
@@ -4847,6 +4848,19 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "posthog_client_lite"
version = "0.1.0"
dependencies = [
"anyhow",
"reqwest",
"serde",
"serde_json",
"sha2",
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -7199,7 +7213,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=devin%2F1746710596-expose-fallocate-modes#8bd6d0e99d4937096acfe40b64fd63aa9ad2e2ea"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"futures",
"nix 0.26.4",
@@ -7810,7 +7824,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=devin%2F1746710596-expose-fallocate-modes#8bd6d0e99d4937096acfe40b64fd63aa9ad2e2ea"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"bytes",
"io-uring",
@@ -8438,8 +8452,10 @@ dependencies = [
"fail",
"form_urlencoded",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-task",
"futures-util",
"generic-array",
"getrandom 0.2.11",
@@ -8469,6 +8485,7 @@ dependencies = [
"once_cell",
"p256 0.13.2",
"parquet",
"percent-encoding",
"prettyplease",
"proc-macro2",
"prost 0.13.3",

View File

@@ -26,6 +26,7 @@ members = [
"libs/utils",
"libs/consumption_metrics",
"libs/postgres_backend",
"libs/posthog_client_lite",
"libs/pq_proto",
"libs/tenant_size_model",
"libs/metrics",
@@ -187,7 +188,7 @@ thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "devin/1746710596-expose-fallocate-modes" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}

View File

@@ -1117,8 +1117,8 @@ RUN wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.
mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \
echo "#nothing to test here" > neon-test.sh
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.1.tar.gz -O pgrag.tar.gz && \
echo "087b2ecd11ba307dc968042ef2e9e43dc04d9ba60e8306e882c407bbe1350a50 pgrag.tar.gz" | sha256sum --check && \
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.2.tar.gz -O pgrag.tar.gz && \
echo "7361654ea24f08cbb9db13c2ee1c0fe008f6114076401bb871619690dafc5225 pgrag.tar.gz" | sha256sum --check && \
mkdir pgrag-src && cd pgrag-src && tar xzf ../pgrag.tar.gz --strip-components=1 -C .
FROM rust-extensions-build-pgrx14 AS pgrag-build
@@ -1971,7 +1971,8 @@ COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/sql
COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/neon_collector_autoscaling.yml /etc/neon_collector_autoscaling.yml
# Make the libraries we built available
RUN echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
COPY --chmod=0666 compute/etc/ld.so.conf.d/00-neon.conf /etc/ld.so.conf.d/00-neon.conf
RUN /sbin/ldconfig
# rsyslog config permissions
# directory for rsyslogd pid file

View File

@@ -0,0 +1 @@
/usr/local/lib

View File

@@ -28,6 +28,7 @@ flate2.workspace = true
futures.workspace = true
http.workspace = true
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true

View File

@@ -11,6 +11,7 @@ use compute_api::spec::{
use futures::StreamExt;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use once_cell::sync::Lazy;
@@ -18,7 +19,7 @@ use postgres;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
@@ -1995,23 +1996,40 @@ LIMIT 100",
tokio::spawn(conn);
// TODO: support other types of grants apart from schemas?
let query = format!(
"GRANT {} ON SCHEMA {} TO {}",
privileges
.iter()
// should not be quoted as it's part of the command.
// is already sanitized so it's ok
.map(|p| p.as_str())
.collect::<Vec<&'static str>>()
.join(", "),
// quote the schema and role name as identifiers to sanitize them.
schema_name.pg_quote(),
role_name.pg_quote(),
);
db_client
.simple_query(&query)
// check the role grants first - to gracefully handle read-replicas.
let select = "SELECT privilege_type
FROM pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
JOIN pg_user users ON acl.grantee = users.usesysid
WHERE users.usename = $1
AND nspname = $2";
let rows = db_client
.query(select, &[role_name, schema_name])
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {select}"))?;
let already_granted: HashSet<String> = rows.into_iter().map(|row| row.get(0)).collect();
let grants = privileges
.iter()
.filter(|p| !already_granted.contains(p.as_str()))
// should not be quoted as it's part of the command.
// is already sanitized so it's ok
.map(|p| p.as_str())
.join(", ");
if !grants.is_empty() {
// quote the schema and role name as identifiers to sanitize them.
let schema_name = schema_name.pg_quote();
let role_name = role_name.pg_quote();
let query = format!("GRANT {grants} ON SCHEMA {schema_name} TO {role_name}",);
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
}
Ok(())
}

View File

@@ -224,7 +224,10 @@ pub fn write_postgres_conf(
writeln!(file, "pgaudit.log_rotation_age=5")?;
// Enable audit logs for pg_session_jwt extension
writeln!(file, "pg_session_jwt.audit_log=on")?;
// TODO: Consider a good approach for shipping pg_session_jwt logs to the same sink as
// pgAudit - additional context in https://github.com/neondatabase/cloud/issues/28863
//
// writeln!(file, "pg_session_jwt.audit_log=on")?;
// Add audit shared_preload_libraries, if they are not present.
//

View File

@@ -14,6 +14,14 @@ PG_VERSION=${PG_VERSION:-14}
CONFIG_FILE_ORG=/var/db/postgres/configs/config.json
CONFIG_FILE=/tmp/config.json
# Test that the first library path that the dynamic loader looks in is the path
# that we use for custom compiled software
first_path="$(ldconfig --verbose 2>/dev/null \
| grep --invert-match ^$'\t' \
| cut --delimiter=: --fields=1 \
| head --lines=1)"
test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat.
echo "Waiting pageserver become ready."
while ! nc -z pageserver 6400; do
sleep 1;

View File

@@ -5,3 +5,4 @@ listen_http_addr='0.0.0.0:9898'
remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' }
control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address
control_plane_emergency_mode=true
virtual_file_io_mode="buffered" # the CI runners where we run the docker compose tests have slow disks

View File

@@ -7,6 +7,8 @@ Author: Christian Schwarz
A brief RFC / GitHub Epic describing a vectored version of the `Timeline::get` method that is at the heart of Pageserver.
**EDIT**: the implementation of this feature is described in [Vlad's (internal) tech talk](https://drive.google.com/file/d/1vfY24S869UP8lEUUDHRWKF1AJn8fpWoJ/view?usp=drive_link).
# Motivation
During basebackup, we issue many `Timeline::get` calls for SLRU pages that are *adjacent* in key space.

View File

@@ -0,0 +1,362 @@
# Direct IO For Pageserver
Date: Apr 30, 2025
## Summary
This document is a retroactive RFC. It
- provides some background on what direct IO is,
- motivates why Pageserver should be using it for its IO, and
- describes how we changed Pageserver to use it.
The [initial proposal](https://github.com/neondatabase/neon/pull/8240) that kicked off the work can be found in this closed GitHub PR.
People primarily involved in this project were:
- Yuchen Liang <yuchen@neon.tech>
- Vlad Lazar <vlad@neon.tech>
- Christian Schwarz <christian@neon.tech>
## Timeline
For posterity, here is the rough timeline of the development work that got us to where we are today.
- Jan 2024: [integrate `tokio-epoll-uring`](https://github.com/neondatabase/neon/pull/5824) along with owned buffers API
- March 2024: `tokio-epoll-uring` enabled in all regions in buffered IO mode
- Feb 2024 to June 2024: PS PageCache Bypass For Data Blocks
- Feb 2024: [Vectored Get Implementation](https://github.com/neondatabase/neon/pull/6576) bypasses delta & image layer blocks for page requests
- Apr to June 2024: [Epic: bypass PageCache for use data blocks](https://github.com/neondatabase/neon/issues/7386) addresses remaining users
- Aug to Nov 2024: direct IO: first code; preliminaries; read path coding; BufferedWriter; benchmarks show perf regressions too high, no-go.
- Nov 2024 to Jan 2025: address perf regressions by developing page_service pipelining (aka batching) and concurrent IO ([Epic](https://github.com/neondatabase/neon/issues/9376))
- Feb to March 2024: rollout batching, then concurrent+direct IO => read path and InMemoryLayer is now direct IO
- Apr 2025: develop & roll out direct IO for the write path
## Background: Terminology & Glossary
**kernel page cache**: the Linux kernel's page cache is a write-back cache for filesystem contents.
The cached unit is memory-page-sized & aligned chunks of the files that are being cached (typically 4k).
The cache lives in kernel memory and is not directly accessible through userspace.
**Buffered IO**: an application's read/write system calls go through the kernel page cache.
For example, a 10 byte sized read or write to offset 5000 in a file will load the file contents
at offset `[4096,8192)` into a free page in the kernel page cache. If necessary, it will evict
a page to make room (cf eviction). Then, the kernel performs a memory-to-memory copy of 10 bytes
from/to the offset `4` (`5000 = 4096 + 4`) within the cached page. If it's a write, the kernel keeps
track of the fact that the page is now "dirty" in some ancillary structure.
**Writeback**: a buffered read/write syscall returns after the memory-to-memory copy. The modifications
made by e.g. write system calls are not even *issued* to disk, let alone durable. Instead, the kernel
asynchronously writes back dirtied pages based on a variety of conditions. For us, the most relevant
ones are a) explicit request by userspace (`fsync`) and b) memory pressure.
**Memory pressure**: the kernel page cache is a best effort service and a user of spare memory capacity.
If there is no free memory, the kernel page allocator will take pages used by page cache to satisfy allocations.
Before reusing a page like that, the page has to be written back (writeback, see above).
The far-reaching consequence of this is that **any allocation of anonymous memory can do IO** if the only
way to get that memory is by eviction & re-using a dirty page cache page.
Notably, this includes a simple `malloc` in userspace, because eventually that boils down to `mmap(..., MAP_ANON, ...)`.
I refer to this effect as the "malloc latency backscatter" caused by buffered IO.
**Direct IO** allows application's read/write system calls to bypass the kernel page cache. The filesystem
is still involved because it is ultimately in charge of mapping the concept of files & offsets within them
to sectors on block devices. Typically, the filesystem poses size and alignment requirements for memory buffers
and file offsets (statx `Dio_mem_align` / `Dio_offset_align`), see [this gist](https://gist.github.com/problame/1c35cac41b7cd617779f8aae50f97155).
The IO operations will fail at runtime with EINVAL if the alignment requirements are not met.
**"buffered" vs "direct"**: the central distinction between buffered and direct IO is about who allocates and
fills the IO buffers, and who controls when exactly the IOs are issued. In buffered IO, it's the syscall handlers,
kernel page cache, and memory management subsystems (cf "writeback"). In direct IO, all of it is done by
the application.
It takes more effort by the application to program with direct instead of buffered IO.
The return is precise control over and a clear distinction between consumption/modification of memory vs disk.
**Pageserver PageCache**: Pageserver has an additional `PageCache` (referred to as PS PageCache from here on, as opposed to "kernel page cache").
Its caching unit is 8KiB blocks of the layer files written by Pageserver.
A miss in PageCache is filled by reading from the filesystem, through the `VirtualFile` abstraction layer.
The default size is tiny (64MiB), very much like Postgres's `shared_buffers`.
We ran production at 128MiB for a long time but gradually moved it up to 2GiB over the past ~year.
**VirtualFile** is Pageserver's abstraction for file IO, very similar to the facility in Postgres that bears the same name.
Its historical purpose appears to be working around open file descriptor limitations, which is practically irrelevant on Linux.
However, the facility in Pageserver is useful as an intermediary layer for metrics and abstracts over the different kinds of
IO engines that Pageserver supports (`std-fs` vs `tokio-epoll-uring`).
## Background: History Of Caching In Pageserver
For multiple years, Pageserver's `PageCache` was on the path of all read _and write_ IO.
It performed write-back to the kernel using buffered IO.
We converted it into a read-only cache of immutable data in [PR 4994](https://github.com/neondatabase/neon/pull/4994).
The introduction of `tokio-epoll-uring` required converting the code base to used owned IO buffers.
The `PageCache` pages are usable as owned IO buffers.
We then started bypassing PageCache for user data blocks.
Data blocks are the 8k blocks of data in layer files that hold the multiple `Value`s, as opposed to the disk btree index blocks that tell us which values exist in a file at what offsets.
The disk btree embedded in delta & image layers remains `PageCache`'d.
Epics for that work were:
- Vectored `Timeline::get` (cf RFC 30) skipped delta and image layer data block `PageCache`ing outright.
- Epic https://github.com/neondatabase/neon/issues/7386 took care of the remaining users for data blocks:
- Materialized page cache (cached materialized pages; shown to be ~0% hit rate in practice)
- InMemoryLayer
- Compaction
The outcome of the above:
1. All data blocks are always read through the `VirtualFile` APIs, hitting the kernel buffered read path (=> kernel page cache).
2. Indirect blocks (=disk btree blocks) would be cached in the PS `PageCache`.
In production we size the PS `PageCache` to be 2GiB.
Thus drives hit rate up to ~99.95% and the eviction rate / replacement rates down to less than 200/second on a 1-minute average, on the busiest machines.
High baseline replacement rates are treated as a signal of resource exhaustion (page cache insufficient to host working set of the PS).
The response to this is to migrate tenants away, or increase PS `PageCache` size.
It is currently manual but could be automated, e.g., in Storage Controller.
In the future, we may eliminate the `PageCache` even for indirect blocks.
For example with an LRU cache that has as unit the entire disk btree content
instead of individual blocks.
## High-Level Design
So, before work on this project started, all data block reads and the entire write path of Pageserver were using kernel-buffered IO, i.e., the kernel page cache.
We now want to get the kernel page cache out of the picture by using direct IO for all interaction with the filesystem.
This achieves the following system properties:
**Predictable VirtualFile latencies**
* With buffered IO, reads are sometimes fast, sometimes slow, depending on kernel page cache hit/miss.
* With buffered IO, appends when writing out new layer files during ingest or compaction are sometimes fast, sometimes slow because of write-back backpressure.
* With buffered IO, the "malloc backscatter" phenomenon pointed out in the Glossary section is not something we actively observe.
But we do have occasional spikes in Dirty memory amount and Memory PSI graphs, so it may already be affecting to some degree.
* By switching to direct IO, above operations will have the (predictable) device latency -- always.
Reads and appends always go to disk.
And malloc will not have to write back dirty data.
**Explicitness & Tangibility of resource usage**
* In a multi-tenant system, it is generally desirable and valuable to be *explicit* about the main resources we use for each tenant.
* By using direct IO, we become explicit about the resources *disk IOPs* and *memory capacity* in a way that was previously being conflated through the kernel page cache, outside our immediate control.
* We will be able to build per-tenant observability of resource usage ("what tenant is causing the actual IOs that are sent to the disk?").
* We will be able to build accounting & QoS by implementing an IO scheduler that is tenant aware. The kernel is not tenant-aware and can't do that.
**CPU Efficiency**
* The involvement of the kernel page cache means one additional memory-to-memory copy on read and write path.
* Direct IO will eliminate that memory-to-memory copy, if we can make the userspace buffers used for the IO calls satisfy direct IO alignment requirements.
The **trade-off** is that we no longer get the theoretical benefits of the kernel page cache. These are:
- read latency improvements for repeat reads of the same data ("locality of reference")
- asterisk: only if that state is still cache-resident by time of next access
- write throughput by having kernel page cache batch small VFS writes into bigger disk writes
- asterisk: only if memory pressure is low enough that the kernel can afford to delay writeback
We are **happy to make this trade-off**:
- Because of the advantages listed above.
- Because we empirically have enough DRAM on Pageservers to serve metadata (=index blocks) from PS PageCache.
(At just 2GiB PS PageCache size, we average a 99.95% hit rate).
So, the latency of going to disk is only for data block reads, not the index traversal.
- Because **the kernel page cache is ineffective** at high tenant density anyway (#tenants/pageserver instance).
And because dense packing of tenants will always be desirable to drive COGS down, we should design the system for it.
(See the appendix for a more detailed explanation why this is).
- So, we accept that some reads that used to be fast by circumstance will have higher but **predictable** latency than before.
### Desired End State
The desired end state of the project is as follows, and with some asterisks, we have achieved it.
All IOs of the Pageserver data path use direct IO, thereby bypassing the kernel page cache.
In particular, the "data path" includes
- the wal ingest path
- compaction
- anything on the `Timeline::get` / `Timeline::get_vectored` path.
The production Pageserver config is tuned such that virtually all non-data blocks are cached in the PS PageCache.
Hit rate target is 99.95%.
There are no regressions to ingest latency.
The total "wait-for-disk time" contribution to random getpage request latency is `O(1 read IOP latency)`.
We accomplish that by having a near 100% PS PageCache hit rate so that layer index traversal effectively never needs not wait for IO.
Thereby, it can issue all the data blocks as it traverses the index, and only wait at the end of it (concurrent IO).
The amortized "wait-for-disk time" contribution of this direct IO proposal to a series of sequential getpage requests is `1/32 * read IOP latency` for each getpage request.
We accomplish this by server-side batching of up to 32 reads into a single `Timeline::get_vectored` call.
(This is an ideal world where our batches are full - that's not the case in prod today because of lack of queue depth).
## Design & Implementation
### Prerequisites
A lot of prerequisite work had to happen to enable use of direct IO.
To meet the "wait-for-disk time" requirements from the DoD, we implement for the read path:
- page_service level server-side batching (config field `page_service_pipelining`)
- concurrent IO (config field `get_vectored_concurrent_io`)
The work for both of these these was tracked [in the epic](https://github.com/neondatabase/neon/issues/9376).
Server-side batching will likely be obsoleted by the [#proj-compute-communicator](https://github.com/neondatabase/neon/pull/10799).
The Concurrent IO work is described in retroactive RFC `2025-04-30-pageserver-concurrent-io-on-read-path.md`.
The implementation is relatively brittle and needs further investment, see the `Future Work` section in that RFC.
For the write path, and especially WAL ingest, we need to hide write latency.
We accomplish this by implementing a (`BufferedWriter`) type that does double-buffering: flushes of the filled
buffer happen in a sidecar tokio task while new writes fill a new buffer.
We refactor InMemoryLayer as well as BlobWriter (=> delta and image layer writers) to use this new `BufferedWriter`.
The most comprehensive write-up of this work is in [the PR description](https://github.com/neondatabase/neon/pull/11558).
### Ensuring Adherence to Alignment Requirements
Direct IO puts requirements on
- memory buffer alignment
- io size (=memory buffer size)
- file offset alignment
The requirements are specific to a combination of filesystem/block-device/architecture(hardware page size!).
In Neon production environments we currently use ext4 with Linux 6.1.X on AWS and Azure storage-optimized instances (locally attached NVMe).
Instead of dynamic discovery using `statx`, we statically hard-code 512 bytes as the buffer/offset alignment and size-multiple.
We made this decision because:
- a) it is compatible with all the environments we need to run in
- b) our primary workload can be small-random-read-heavy (we do merge adjacent reads if possible, but the worst case is that all `Value`s that needs to be read are far apart)
- c) 512-byte tail latency on the production instance types is much better than 4k (p99.9: 3x lower, p99.99 5x lower).
- d) hard-coding at compile-time allows us to use the Rust type system to enforce the use of only aligned IO buffers, eliminating a source of runtime errors typically associated with direct IO.
This was [discussed here](https://neondb.slack.com/archives/C07BZ38E6SD/p1725036790965549?thread_ts=1725026845.455259&cid=C07BZ38E6SD).
The new `IoBufAligned` / `IoBufAlignedMut` marker traits indicate that a given buffer meets memory alignment requirements.
All `VirtualFile` APIs and several software layers built on top of them only accept buffers that implement those traits.
Implementors of the marker traits are:
- `IoBuffer` / `IoBufferMut`: used for most reads and writes
- `PageWriteGuardBuf`: for filling PS PageCache pages (index blocks!)
The alignment requirement is infectious; it permeates bottom-up throughout the code base.
We stop the infection at roughly the same layers in the code base where we stopped permeating the
use of owned-buffers-style API for tokio-epoll-uring. The way the stopping works is by introducing
a memory-to-memory copy from/to some unaligned memory location on the stack/current/heap.
The places where we currently stop permeating are sort of arbitrary. For example, it would probably
make sense to replace more usage of `Bytes` that we know holds 8k pages with 8k-sized `IoBuffer`s.
The `IoBufAligned` / `IoBufAlignedMut` types do not protect us from the following types of runtime errors:
- non-adherence to file offset alignment requirements
- non-adherence to io size requirements
The following higher-level constructs ensure we meet the requirements:
- read path: the `ChunkedVectoredReadBuilder` and `mod vectored_dio_read` ensure reads happen at aligned offsets and in appropriate size multiples.
- write path: `BufferedWriter` only writes in multiples of the capacity, at offsets that are `start_offset+N*capacity`; see its doc comment.
Note that these types are used always, regardless of whether direct IO is enabled or not.
There are some cases where this adds unnecessary overhead to buffered IO (e.g. all memcpy's inflated to multiples of 512).
But we could not identify meaningful impact in practice when we shipped these changes while we were still using buffered IO.
### Configuration / Feature Flagging
In the previous section we described how all users of VirtualFile were changed to always adhere to direct IO alignment and size-multiple requirements.
To actually enable direct IO, all we need to do is set the `O_DIRECT` flag in `open` syscalls / io_uring operations.
We set `O_DIRECT` based on:
- the VirtualFile API used to create/open the VirtualFile instance
- the `virtual_file_io_mode` configuration flag
- the OpenOptions `read` and/or `write` flags.
The VirtualFile APIs suffixed with `_v2` are the only ones that _may_ open with `O_DIRECT` depending on the other two factors in above list.
Other APIs never use `O_DIRECT`.
(The name is bad and should really be `_maybe_direct_io`.)
The reason for having new APIs is because all code used VirtualFile but implementation and rollout happened in consecutive phases (read path, InMemoryLayer, write path).
At the VirtualFile level, context on whether an instance of VirtualFile is on read path, InMemoryLayer, or write path is not available.
The `_v2` APIs then check make the decision to set `O_DIRECT` based on the `virtual_file_io_mode` flag and the OpenOptions `read`/`write` flags.
The result is the following runtime behavior:
|what|OpenOptions|`v_f_io_mode`<br/>=`buffered`|`v_f_io_mode`<br/>=`direct`|`v_f_io_mode`<br/>=`direct-rw`|
|-|-|-|-|-|
|`DeltaLayerInner`|read|()|O_DIRECT|O_DIRECT|
|`ImageLayerInner`|read|()|O_DIRECT|O_DIRECT|
|`InMemoryLayer`|read + write|()|()*|O_DIRECT|
|`DeltaLayerWriter`| write | () | () | O_DIRECT |
|`ImageLayerWriter`| write | () | () | O_DIRECT |
|`download_layer_file`|write |()|()|O_DIRECT|
The `InMemoryLayer` is marked with `*` because there was a period when it *did* use O_DIRECT under `=direct`.
That period was when we implemented and shipped the first version of `BufferedWriter`.
We used it in `InMemoryLayer` and `download_layer_file` but it was only sensitive to `v_f_io_mode` in `InMemoryLayer`.
The introduction of `=direct-rw`, and the switch of the remaining write path to `BufferedWriter`, happened later,
in https://github.com/neondatabase/neon/pull/11558.
Note that this way of feature flagging inside VirtualFile makes it less and less a general purpose POSIX file access abstraction.
For example, with `=direct-rw` enabled, it is no longer possible to open a `VirtualFile` without `O_DIRECT`. It'll always be set.
## Correctness Validation
The correctness risks with this project were:
- Memory safety issues in the `IoBuffer` / `IoBufferMut` implementation.
These types expose an API that is largely identical to that of the `bytes` crate and/or Vec.
- Runtime errors (=> downtime / unavailability) because of non-adherence to alignment/size-multiple requirements, resulting in EINVAL on the read path.
We sadly do not have infrastructure to run pageserver under `cargo miri`.
So for memory safety issues, we relied on careful peer review.
We do assert the production-like alignment requirements in testing builds.
However, these asserts were added retroactively.
The actual validation before rollout happened in staging and pre-prod.
We eventually enabled `=direct`/`=direct-rw` for Rust unit tests and the regression test suite.
I cannot recall a single instance of staging/pre-prod/production errors caused by non-adherence to alignment/size-multiple requirements.
Evidently developer testing was good enough.
## Performance Validation
The read path went through a lot of iterations of benchmarking in staging and pre-prod.
The benchmarks in those environments demonstrated performance regressions early in the implementation.
It was actually this performance testing that made us implement batching and concurrent IO to avoid unacceptable regressions.
The write path was much quicker to validate because `bench_ingest` covered all of the (less numerous) access patterns.
## Future Work
There is minor and major follow-up work that can be considered in the future.
Check the (soon-to-be-closed) Epic https://github.com/neondatabase/neon/issues/8130's "Follow-Ups" section for a current list.
Read Path:
- PS PageCache hit rate is crucial to unlock concurrent IO and reasonable latency for random reads generally.
Instead of reactively sizing PS PageCache, we should estimate the required PS PageCache size
and potentially also use that to drive placement decisions of shards from StorageController
https://github.com/neondatabase/neon/issues/9288
- ... unless we get rid of PS PageCache entirely and cache the index block in a more specialized cache.
But even then, an estimation of the working set would be helpful to figure out caching strategy.
Write Path:
- BlobWriter and its users could switch back to a borrowed API https://github.com/neondatabase/neon/issues/10129
- ... unless we want to implement bypass mode for large writes https://github.com/neondatabase/neon/issues/10101
- The `TempVirtualFile` introduced as part of this project could internalize more of the common usage pattern: https://github.com/neondatabase/neon/issues/11692
- Reduce conditional compilation around `virtual_file_io_mode`: https://github.com/neondatabase/neon/issues/11676
Both:
- A performance simulation mode that pads VirtualFile op latencies to typical NVMe latencies, even if the underlying storage is faster.
This would avoid misleadingly good performance on developer systems and in benchmarks on systems that are less busy than production hosts.
However, padding latencies at microsecond scale is non-trivial.
Misc:
- We should finish trimming VirtualFile's scope to be truly limited to core data path read & write.
Abstractions for reading & writing pageserver config, location config, heatmaps, etc, should use
APIs in a different package (`VirtualFile::crashsafe_overwrite` and `VirtualFile::read_to_string`
are good entrypoints for cleanup.) https://github.com/neondatabase/neon/issues/11809
# Appendix
## Why Kernel Page Cache Is Ineffective At Tenant High Density
In the Motivation section, we stated:
> - **The kernel page cache ineffective** at high tenant density anyways (#tenants/pageserver instance).
The reason is that the Pageserver workload sent from Computes is whatever is a Compute cache(s) miss.
That's either sequential scans or random reads.
A random read workload simply causes cache thrashing because a packed Pageserver NVMe drive (`im4gn.2xlarge`) has ~100x more capacity than DRAM available.
It is complete waste to have the kernel page cache cache data blocks in this case.
Sequential read workloads *can* benefit iff those pages have been updated recently (=no image layer yet) and together in time/LSN space.
In such cases, the WAL records of those updates likely sit on the same delta layer block.
When Compute does a sequential scan, it sends a series of single-page requests for these individual pages.
When Pageserver processes the second request in such a series, it goes to the same delta layer block and have a kernel page cache hit.
This dependence on kernel page cache for sequential scan performance is significant, but the solution is at a higher level than generic data block caching.
We can either add a small per-connection LRU cache for such delta layer blocks.
Or we can merge those sequential requests into a larger vectored get request, which is designed to never read a block twice.
This amortizes the read latency for our delta layer block across the vectored get batch size (which currently is up to 32).
There are Pageserver-internal workloads that do sequential access (compaction, image layer generation), but these
1. are not latency-critical and can do batched access outside of the `page_service` protocol constraints (image layer generation)
2. don't actually need to reconstruct images and therefore can use totally different access methods (=> compaction can use k-way merge iterators with their own internal buffering / prefetching).

View File

@@ -0,0 +1,251 @@
# Concurrent IO for Pageserver Read Path
Date: May 6, 2025
## Summary
This document is a retroactive RFC on the Pageserver Concurrent IO work that happened in late 2024 / early 2025.
The gist of it is that Pageserver's `Timeline::get_vectored` now _issues_ the data block read operations against layer files
_as it traverses the layer map_ and only _wait_ once, for all of them, after traversal is complete.
Assuming a good PS PageCache hits on the index blocks during traversal, this drives down the "wait-for-disk" time
contribution down from `random_read_io_latency * O(number_of_values)` to `random_read_io_latency * O(1 + traversal)`.
The motivation for why this work had to happen when it happened was the switch of Pageserver to
- not cache user data blocks in PS PageCache and
- switch to use direct IO.
More context on this are given in complimentary RFC `./rfcs/2025-04-30-direct-io-for-pageserver.md`.
### Refs
- Epic: https://github.com/neondatabase/neon/issues/9378
- Prototyping happened during the Lisbon 2024 Offsite hackathon: https://github.com/neondatabase/neon/pull/9002
- Main implementation PR with good description: https://github.com/neondatabase/neon/issues/9378
Design and implementation by:
- Vlad Lazar <vlad@neon.tech>
- Christian Schwarz <christian@neon.tech>
## Background & Motivation
The Pageserver read path (`Timeline::get_vectored`) consists of two high-level steps:
- Retrieve the delta and image `Value`s required to reconstruct the requested Page@LSN (`Timeline::get_values_reconstruct_data`).
- Pass these values to walredo to reconstruct the page images.
The read path used to be single-key but has been made multi-key some time ago.
([Internal tech talk by Vlad](https://drive.google.com/file/d/1vfY24S869UP8lEUUDHRWKF1AJn8fpWoJ/view?usp=drive_link))
However, for simplicity, most of this doc will explain things in terms of a single key being requested.
The `Value` retrieval step above can be broken down into the following functions:
- **Traversal** of the layer map to figure out which `Value`s from which layer files are required for the page reconstruction.
- **Read IO Planning**: planning of the read IOs that need to be issued to the layer files / filesystem / disk.
The main job here is to coalesce the small value reads into larger filesystem-level read operations.
This layer also takes care of direct IO alignment and size-multiple requirements (cf the RFC for details.)
Check `struct VectoredReadPlanner` and `mod vectored_dio_read` for how it's done.
- **Perform the read IO** using `tokio-epoll-uring`.
Before this project, above functions were sequentially interleaved, meaning:
1. we would advance traversal, ...
2. discover, that we need to read a value, ...
3. read it from disk using `tokio-epoll-uring`, ...
4. goto 1 unless we're done.
This meant that if N `Value`s need to be read to reconstruct a page,
the time we spend waiting for disk will be we `random_read_io_latency * O(number_of_values)`.
## Design
The **traversal** and **read IO Planning** jobs still happen sequentially, layer by layer, as before.
But instead of performing the read IOs inline, we submit the IOs to a concurrent tokio task for execution.
After the last read from the last layer is submitted, we wait for the IOs to complete.
Assuming the filesystem / disk is able to actually process the submitted IOs without queuing,
we arrive at _time spent waiting for disk_ ~ `random_read_io_latency * O(1 + traversal)`.
Note this whole RFC is concerned with the steady state where all layer files required for reconstruction are resident on local NVMe.
Traversal will stall on on-demand layer download if a layer is not yet resident.
It cannot proceed without the layer being resident beccause its next step depends on the contents of the layer index.
### Avoiding Waiting For IO During Traversal
The `traversal` component in above time-spent-waiting-for-disk estimation is dominant and needs to be minimized.
Before this project, traversal needed to perform IOs for the following:
1. The time we are waiting on PS PageCache to page in the visited layers' disk btree index blocks.
2. When visiting a delta layer, reading the data block that contains a `Value` for a requested key,
to determine whether the `Value::will_init` the page and therefore traversal can stop for this key.
The solution for (1) is to raise the PS PageCache size such that the hit rate is practically 100%.
(Check out the `Background: History Of Caching In Pageserver` section in the RFC on Direct IO for more details.)
The solution for (2) is source `will_init` from the disk btree index keys, which fortunately
already encode this bit of information since the introduction of the current storage/layer format.
### Concurrent IOs, Submission & Completion
To separate IO submission from waiting for its completion,
we introduce the notion of an `IoConcurrency` struct through which IOs are issued.
An IO is an opaque future that
- captures the `tx` side of a `oneshot` channel
- performs the read IO by calling `VirtualFile::read_exact_at().await`
- sending the result into the `tx`
Issuing an IO means `Box`ing the future above and handing that `Box` over to the `IoConcurrency` struct.
The traversal code that submits the IO stores the the corresponding `oneshot::Receiver`
in the `VectoredValueReconstructState`, in the the place where we previously stored
the sequentially read `img` and `records` fields.
When we're done with traversal, we wait for all submitted IOs:
for each key, there is a future that awaits all the `oneshot::Receiver`s
for that key, and then calls into walredo to reconstruct the page image.
Walredo is now invoked concurrently for each value instead of sequentially.
Walredo itself remains unchanged.
The spawned IO futures are driven to completion by a sidecar tokio task that
is separate from the task that performs all the layer visiting and spawning of IOs.
That tasks receives the IO futures via an unbounded mpsc channel and
drives them to completion inside a `FuturedUnordered`.
### Error handling, Panics, Cancellation-Safety
There are two error classes during reconstruct data retrieval:
* traversal errors: index lookup, move to next layer, and the like
* value read IO errors
A traversal error fails the entire `get_vectored` request, as before this PR.
A value read error only fails reconstruction of that value.
Panics and dropping of the `get_vectored` future before it completes
leaves the sidecar task running and does not cancel submitted IOs
(see next section for details on sidecar task lifecycle).
All of this is safe, but, today's preference in the team is to close out
all resource usage explicitly if possible, rather than cancelling + forgetting
about it on drop. So, there is warning if we drop a
`VectoredValueReconstructState`/`ValuesReconstructState` that still has uncompleted IOs.
### Sidecar Task Lifecycle
The sidecar tokio task is spawned as part of the `IoConcurrency::spawn_from_conf` struct.
The `IoConcurrency` object acts as a handle through which IO futures are submitted.
The spawned tokio task holds the `Timeline::gate` open.
It is _not_ sensitive to `Timeline::cancel`, but instead to the `IoConcurrency` object being dropped.
Once the `IoConcurrency` struct is dropped, no new IO futures can come in
but already submitted IO futures will be driven to completion regardless.
We _could_ safely stop polling these futures because `tokio-epoll-uring` op futures are cancel-safe.
But the underlying kernel and hardware resources are not magically freed up by that.
So, again, in the interest of closing out all outstanding resource usage, we make timeline shutdown wait for sidecar tasks and their IOs to complete.
Under normal conditions, this should be in the low hundreds of microseconds.
It is advisable to make the `IoConcurrency` as long-lived as possible to minimize the amount of
tokio task churn (=> lower pressure on tokio). Generally this means creating it "high up" in the call stack.
The pain with this is that the `IoConcurrency` reference needs to be propagated "down" to
the (short-lived) functions/scope where we issue the IOs.
We would like to use `RequestContext` for this propagation in the future (issue [here](https://github.com/neondatabase/neon/issues/10460)).
For now, we just add another argument to the relevant code paths.
### Feature Gating
The `IoConcurrency` is an `enum` with two variants: `Sequential` and `SidecarTask`.
The behavior from before this project is available through `IoConcurrency::Sequential`,
which awaits the IO futures in place, without "spawning" or "submitting" them anywhere.
The `get_vectored_concurrent_io` pageserver config variable determines the runtime value,
**except** for the places that use `IoConcurrency::sequential` to get an `IoConcurrency` object.
### Alternatives Explored & Caveats Encountered
A few words on the rationale behind having a sidecar *task* and what
alternatives were considered but abandoned.
#### Why We Need A Sidecar *Task* / Why Just `FuturesUnordered` Doesn't Work
We explored to not have a sidecar task, and instead have a `FuturesUnordered` per
`Timeline::get_vectored`. We would queue all IO futures in it and poll it for the
first time after traversal is complete (i.e., at `collect_pending_ios`).
The obvious disadvantage, but not showstopper, is that we wouldn't be submitting
IOs until traversal is complete.
The showstopper however, is that deadlocks happen if we don't drive the
IO futures to completion independently of the traversal task.
The reason is that both the IO futures and the traversal task may hold _some_,
_and_ try to acquire _more_, shared limited resources.
For example, both the travseral task and IO future may try to acquire
* a `VirtualFile` file descriptor cache slot async mutex (observed during impl)
* a `tokio-epoll-uring` submission slot (observed during impl)
* a `PageCache` slot (currently this is not the case but we may move more code into the IO futures in the future)
#### Why We Don't Do `tokio::task`-per-IO-future
Another option is to spawn a short-lived `tokio::task` for each IO future.
We implemented and benchmarked it during development, but found little
throughput improvement and moderate mean & tail latency degradation.
Concerns about pressure on the tokio scheduler led us to abandon this variant.
## Future Work
In addition to what is listed here, also check the "Punted" list in the epic:
https://github.com/neondatabase/neon/issues/9378
### Enable `Timeline::get`
The only major code path that still uses `IoConcurrency::sequential` is `Timeline::get`.
The impact is that roughly the following parts of pageserver do not benefit yet:
- parts of basebackup
- reads performed by the ingest path
- most internal operations that read metadata keys (e.g. `collect_keyspace`!)
The solution is to propagate `IoConcurrency` via `RequestContext`:https://github.com/neondatabase/neon/issues/10460
The tricky part is to figure out at which level of the code the `IoConcurrency` is spawned (and added to the RequestContext).
Also, propagation via `RequestContext` makes makes it harder to tell during development whether a given
piece of code uses concurrent vs sequential mode: one has to recurisvely walk up the call tree to find the
place that puts the `IoConcurrency` into the `RequestContext`.
We'd have to use `::Sequential` as the conservative default value in a fresh `RequestContext`, and add some
observability to weed out places that fail to enrich with a properly spanwed `IoConcurrency::spawn_from_conf`.
### Concurrent On-Demand Downloads enabled by Detached Indices
As stated earlier, traversal stalls on on-demand download because its next step depends on the contents of the layer index.
Once we have separated indices from data blocks (=> https://github.com/neondatabase/neon/issues/11695)
we will only need to stall if the index is not resident. The download of the data blocks can happen concurrently or in the background. For example:
- Move the `Layer::get_or_maybe_download().await` inside the IO futures.
This goes in the opposite direction of the next "future work" item below, but it's easy to do.
- Serve the IO future directly from object storage and dispatch the layer download
to some other actor, e.g., an actor that is responsible for both downloads & eviction.
### New `tokio-epoll-uring` API That Separates Submission & Wait-For-Completion
Instead of `$op().await` style API, it would be useful to have a different `tokio-epoll-uring` API
that separates enqueuing (without necessarily `io_uring_enter`ing the kernel each time), submission,
and then wait for completion.
The `$op().await` API is too opaque, so we _have_ to stuff it into a `FuturesUnordered`.
A split API as sketched above would allow traversal to ensure an IO operation is enqueued to the kernel/disk (and get back-pressure iff the io_uring squeue is full).
While avoiding spending of CPU cycles on processing of completions while we're still traversing.
The idea gets muddied by the fact that we may self-deadlock if we submit too much without completing.
So, the submission part of the split API needs to process completions if squeue is full.
In any way, this split API is precondition for the bigger issue with the design presented here,
which we dicsuss in the next section.
### Opaque Futures Are Brittle
The use of opaque futures to represent submitted IOs is a clever hack to minimize changes & allow for near-perfect feature-gating.
However, we take on **brittleness** because callers must guarantee that the submitted futures are independent.
By our experience, it is non-trivial to identify or rule out the interdependencies.
See the lengthy doc comment on the `IoConcurrency::spawn_io` method for more details.
The better interface and proper subsystem boundary is a _descriptive_ struct of what needs to be done ("read this range from this VirtualFile into this buffer")
and get back a means to wait for completion.
The subsystem can thereby reason by its own how operations may be related;
unlike today, where the submitted opaque future can do just about anything.

View File

@@ -1832,6 +1832,7 @@ pub mod virtual_file {
Eq,
Hash,
strum_macros::EnumString,
strum_macros::EnumIter,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
@@ -1843,10 +1844,8 @@ pub mod virtual_file {
/// Uses buffered IO.
Buffered,
/// Uses direct IO for reads only.
#[cfg(target_os = "linux")]
Direct,
/// Use direct IO for reads and writes.
#[cfg(target_os = "linux")]
DirectRw,
}
@@ -1854,26 +1853,13 @@ pub mod virtual_file {
pub fn preferred() -> Self {
// The default behavior when running Rust unit tests without any further
// flags is to use the newest behavior (DirectRw).
// The CI uses the following environment variable to unit tests for all
// different modes.
// The CI uses the environment variable to unit tests for all different modes.
// NB: the Python regression & perf tests have their own defaults management
// that writes pageserver.toml; they do not use this variable.
if cfg!(test) {
static CACHED: LazyLock<IoMode> = LazyLock::new(|| {
utils::env::var_serde_json_string(
"NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
)
.unwrap_or(
#[cfg(target_os = "linux")]
IoMode::DirectRw,
#[cfg(not(target_os = "linux"))]
IoMode::Buffered,
)
});
*CACHED
} else {
IoMode::Buffered
}
static ENV_OVERRIDE: LazyLock<Option<IoMode>> = LazyLock::new(|| {
utils::env::var_serde_json_string("NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE")
});
ENV_OVERRIDE.unwrap_or(IoMode::DirectRw)
}
}
@@ -1883,9 +1869,7 @@ pub mod virtual_file {
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
#[cfg(target_os = "linux")]
v if v == (IoMode::Direct as u8) => IoMode::Direct,
#[cfg(target_os = "linux")]
v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw,
x => return Err(x),
})

View File

@@ -36,6 +36,24 @@ impl Value {
Value::WalRecord(rec) => rec.will_init(),
}
}
#[inline(always)]
pub fn estimated_size(&self) -> usize {
match self {
Value::Image(image) => image.len(),
Value::WalRecord(NeonWalRecord::AuxFile {
content: Some(content),
..
}) => content.len(),
Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
Value::WalRecord(NeonWalRecord::ClogSetAborted { xids }) => xids.len() * 4,
Value::WalRecord(NeonWalRecord::ClogSetCommitted { xids, .. }) => xids.len() * 4,
Value::WalRecord(NeonWalRecord::MultixactMembersCreate { members, .. }) => {
members.len() * 8
}
_ => 8192, /* use image size as the estimation */
}
}
}
#[derive(Debug, PartialEq)]

View File

@@ -0,0 +1,14 @@
[package]
name = "posthog_client_lite"
version = "0.1.0"
edition = "2024"
license.workspace = true
[dependencies]
anyhow.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
workspace_hack.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1,634 @@
//! A lite version of the PostHog client that only supports local evaluation of feature flags.
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sha2::Digest;
#[derive(Debug, thiserror::Error)]
pub enum PostHogEvaluationError {
/// The feature flag is not available, for example, because the local evaluation data is not populated yet.
#[error("Feature flag not available: {0}")]
NotAvailable(String),
#[error("No condition group is matched")]
NoConditionGroupMatched,
/// Real errors, e.g., the rollout percentage does not add up to 100.
#[error("Failed to evaluate feature flag: {0}")]
Internal(String),
}
#[derive(Deserialize)]
pub struct LocalEvaluationResponse {
#[allow(dead_code)]
flags: Vec<LocalEvaluationFlag>,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlag {
key: String,
filters: LocalEvaluationFlagFilters,
active: bool,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilters {
groups: Vec<LocalEvaluationFlagFilterGroup>,
multivariate: LocalEvaluationFlagMultivariate,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilterGroup {
variant: Option<String>,
properties: Option<Vec<LocalEvaluationFlagFilterProperty>>,
rollout_percentage: i64,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilterProperty {
key: String,
value: PostHogFlagFilterPropertyValue,
operator: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PostHogFlagFilterPropertyValue {
String(String),
Number(f64),
Boolean(bool),
List(Vec<String>),
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagMultivariate {
variants: Vec<LocalEvaluationFlagMultivariateVariant>,
}
#[derive(Deserialize)]
pub struct LocalEvaluationFlagMultivariateVariant {
key: String,
rollout_percentage: i64,
}
pub struct FeatureStore {
flags: HashMap<String, LocalEvaluationFlag>,
}
impl Default for FeatureStore {
fn default() -> Self {
Self::new()
}
}
enum GroupEvaluationResult {
MatchedAndOverride(String),
MatchedAndEvaluate,
Unmatched,
}
impl FeatureStore {
pub fn new() -> Self {
Self {
flags: HashMap::new(),
}
}
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
self.flags.clear();
for flag in flags {
self.flags.insert(flag.key.clone(), flag);
}
}
/// Generate a consistent hash for a user ID (e.g., tenant ID).
///
/// The implementation is different from PostHog SDK. In PostHog SDK, it is sha1 of `user_id.distinct_id.salt`.
/// However, as we do not upload all of our tenant IDs to PostHog, we do not have the PostHog distinct_id for a
/// tenant. Therefore, the way we compute it is sha256 of `user_id.feature_id.salt`.
fn consistent_hash(user_id: &str, flag_key: &str, salt: &str) -> f64 {
let mut hasher = sha2::Sha256::new();
hasher.update(user_id);
hasher.update(".");
hasher.update(flag_key);
hasher.update(".");
hasher.update(salt);
let hash = hasher.finalize();
let hash_int = u64::from_le_bytes(hash[..8].try_into().unwrap());
hash_int as f64 / u64::MAX as f64
}
/// Evaluate a condition. Returns an error if the condition cannot be evaluated due to parsing error or missing
/// property.
fn evaluate_condition(
&self,
operator: &str,
provided: &PostHogFlagFilterPropertyValue,
requested: &PostHogFlagFilterPropertyValue,
) -> Result<bool, PostHogEvaluationError> {
match operator {
"exact" => {
let PostHogFlagFilterPropertyValue::String(provided) = provided else {
// Left should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a string: {:?}",
provided
)));
};
let PostHogFlagFilterPropertyValue::List(requested) = requested else {
// Right should be a list of string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a list: {:?}",
requested
)));
};
Ok(requested.contains(provided))
}
"lt" | "gt" => {
let PostHogFlagFilterPropertyValue::String(requested) = requested else {
// Right should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a string: {:?}",
requested
)));
};
let Ok(requested) = requested.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the right side of the condition as a number: {:?}",
requested
)));
};
// Left can either be a number or a string
let provided = match provided {
PostHogFlagFilterPropertyValue::Number(provided) => *provided,
PostHogFlagFilterPropertyValue::String(provided) => {
let Ok(provided) = provided.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the left side of the condition as a number: {:?}",
provided
)));
};
provided
}
_ => {
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a number or a string: {:?}",
provided
)));
}
};
match operator {
"lt" => Ok(provided < requested),
"gt" => Ok(provided > requested),
op => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
op
))),
}
}
_ => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
operator
))),
}
}
/// Evaluate a percentage.
fn evaluate_percentage(&self, mapped_user_id: f64, percentage: i64) -> bool {
mapped_user_id <= percentage as f64 / 100.0
}
/// Evaluate a filter group for a feature flag. Returns an error if there are errors during the evaluation.
///
/// Return values:
/// Ok(GroupEvaluationResult::MatchedAndOverride(variant)): matched and evaluated to this value
/// Ok(GroupEvaluationResult::MatchedAndEvaluate): condition matched but no variant override, use the global rollout percentage
/// Ok(GroupEvaluationResult::Unmatched): condition unmatched
fn evaluate_group(
&self,
group: &LocalEvaluationFlagFilterGroup,
hash_on_group_rollout_percentage: f64,
provided_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<GroupEvaluationResult, PostHogEvaluationError> {
if let Some(ref properties) = group.properties {
for property in properties {
if let Some(value) = provided_properties.get(&property.key) {
// The user provided the property value
if !self.evaluate_condition(
property.operator.as_ref(),
value,
&property.value,
)? {
return Ok(GroupEvaluationResult::Unmatched);
}
} else {
// We cannot evaluate, the property is not available
return Err(PostHogEvaluationError::NotAvailable(format!(
"The required property in the condition is not available: {}",
property.key
)));
}
}
}
// The group has no condition matchers or we matched the properties
if self.evaluate_percentage(hash_on_group_rollout_percentage, group.rollout_percentage) {
if let Some(ref variant_override) = group.variant {
Ok(GroupEvaluationResult::MatchedAndOverride(
variant_override.clone(),
))
} else {
Ok(GroupEvaluationResult::MatchedAndEvaluate)
}
} else {
Ok(GroupEvaluationResult::Unmatched)
}
}
/// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors
/// during the evaluation.
///
/// The parsing logic is as follows:
///
/// * Match each filter group.
/// - If a group is matched, it will first determine whether the user is in the range of the group's rollout
/// percentage. We will generate a consistent hash for the user ID on the group rollout percentage. This hash
/// is shared across all groups.
/// - If the hash falls within the group's rollout percentage, return the variant if it's overridden, or
/// - Evaluate the variant using the global config and the global rollout percentage.
/// * Otherwise, continue with the next group until all groups are evaluated and no group is within the
/// rollout percentage.
/// * If there are no matching groups, return an error.
///
/// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%).
/// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override.
/// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C.
pub fn evaluate_multivariate(
&self,
flag_key: &str,
user_id: &str,
) -> Result<String, PostHogEvaluationError> {
let hash_on_global_rollout_percentage =
Self::consistent_hash(user_id, flag_key, "multivariate");
let hash_on_group_rollout_percentage =
Self::consistent_hash(user_id, flag_key, "within_group");
self.evaluate_multivariate_inner(
flag_key,
hash_on_global_rollout_percentage,
hash_on_group_rollout_percentage,
&HashMap::new(),
)
}
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
/// and avoid duplicate computations.
///
/// Use a different consistent hash for evaluating the group rollout percentage.
/// The behavior: if the condition is set to rolling out to 10% of the users, and
/// we set the variant A to 20% in the global config, then 2% of the total users will
/// be evaluated to variant A.
///
/// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two
/// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users
/// will be evaluated (versus 30% if group evaluation is done independently).
pub(crate) fn evaluate_multivariate_inner(
&self,
flag_key: &str,
hash_on_global_rollout_percentage: f64,
hash_on_group_rollout_percentage: f64,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<String, PostHogEvaluationError> {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
)));
}
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
// does not matter.
for group in &flag_config.filters.groups {
match self.evaluate_group(group, hash_on_group_rollout_percentage, properties)? {
GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant),
GroupEvaluationResult::MatchedAndEvaluate => {
let mut percentage = 0;
for variant in &flag_config.filters.multivariate.variants {
percentage += variant.rollout_percentage;
if self
.evaluate_percentage(hash_on_global_rollout_percentage, percentage)
{
return Ok(variant.key.clone());
}
}
// This should not happen because the rollout percentage always adds up to 100, but just in case that PostHog
// returned invalid spec, we return an error.
return Err(PostHogEvaluationError::Internal(format!(
"Rollout percentage does not add up to 100: {}",
flag_key
)));
}
GroupEvaluationResult::Unmatched => continue,
}
}
// If no group is matched, the feature is not available, and up to the caller to decide what to do.
Err(PostHogEvaluationError::NoConditionGroupMatched)
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
)))
}
}
}
/// A lite PostHog client.
///
/// At the point of writing this code, PostHog does not have a functional Rust client with feature flag support.
/// This is a lite version that only supports local evaluation of feature flags and only supports those JSON specs
/// that will be used within Neon.
///
/// PostHog is designed as a browser-server system: the browser (client) side uses the client key and is exposed
/// to the end users; the server side uses a server key and is not exposed to the end users. The client and the
/// server has different API keys and provide a different set of APIs. In Neon, we only have the server (that is
/// pageserver), and it will use both the client API and the server API. So we need to store two API keys within
/// our PostHog client.
///
/// The server API is used to fetch the feature flag specs. The client API is used to capture events in case we
/// want to report the feature flag usage back to PostHog. The current plan is to use PostHog only as an UI to
/// configure feature flags so it is very likely that the client API will not be used.
pub struct PostHogClient {
/// The server API key.
server_api_key: String,
/// The client API key.
client_api_key: String,
/// The project ID.
project_id: String,
/// The private API URL.
private_api_url: String,
/// The public API URL.
public_api_url: String,
/// The HTTP client.
client: reqwest::Client,
}
impl PostHogClient {
pub fn new(
server_api_key: String,
client_api_key: String,
project_id: String,
private_api_url: String,
public_api_url: String,
) -> Self {
let client = reqwest::Client::new();
Self {
server_api_key,
client_api_key,
project_id,
private_api_url,
public_api_url,
client,
}
}
pub fn new_with_us_region(
server_api_key: String,
client_api_key: String,
project_id: String,
) -> Self {
Self::new(
server_api_key,
client_api_key,
project_id,
"https://us.posthog.com".to_string(),
"https://us.i.posthog.com".to_string(),
)
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
/// - <https://posthog.com/docs/api/feature-flags#get-api-projects-project_id-feature_flags-local_evaluation>
/// - <https://posthog.com/docs/feature-flags/local-evaluation>
///
/// The handling logic in [`FeatureStore`] mostly follows the Python API implementation.
/// See `_compute_flag_locally` in <https://github.com/PostHog/posthog-python/blob/master/posthog/client.py>
pub async fn get_feature_flags_local_evaluation(
&self,
) -> anyhow::Result<LocalEvaluationResponse> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.private_api_url, self.project_id
);
let response = self
.client
.get(url)
.bearer_auth(&self.server_api_key)
.send()
.await?;
let body = response.text().await?;
Ok(serde_json::from_str(&body)?)
}
/// Capture an event. This will only be used to report the feature flag usage back to PostHog, though
/// it also support a lot of other functionalities.
///
/// <https://posthog.com/docs/api/capture>
pub async fn capture_event(
&self,
event: &str,
distinct_id: &str,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> anyhow::Result<()> {
// PUBLIC_URL/capture/
// with bearer token of self.client_api_key
let url = format!("{}/capture/", self.public_api_url);
self.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.client_api_key,
"distinct_id": distinct_id,
"event": event,
"properties": properties,
}))?)
.send()
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn data() -> &'static str {
r#"{
"flags": [
{
"id": 132794,
"team_id": 152860,
"name": "",
"key": "gc-compaction",
"filters": {
"groups": [
{
"variant": "enabled-stage-2",
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 50
},
{
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 80
}
],
"payloads": {},
"multivariate": {
"variants": [
{
"key": "disabled",
"name": "",
"rollout_percentage": 90
},
{
"key": "enabled-stage-1",
"name": "",
"rollout_percentage": 10
},
{
"key": "enabled-stage-2",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled-stage-3",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled",
"name": "",
"rollout_percentage": 0
}
]
}
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 6
}
],
"group_type_mapping": {},
"cohorts": {}
}"#
}
#[test]
fn parse_local_evaluation() {
let data = data();
let _: LocalEvaluationResponse = serde_json::from_str(data).unwrap();
}
#[test]
fn evaluate_multivariate() {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =
store.evaluate_multivariate_inner("gc-compaction", 1.00, 0.40, &HashMap::new());
assert!(matches!(
variant,
Err(PostHogEvaluationError::NotAvailable(_))
),);
let properties_unmatched = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("paid".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// This does not match any group so there will be an error.
let variant =
store.evaluate_multivariate_inner("gc-compaction", 1.00, 0.40, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let variant =
store.evaluate_multivariate_inner("gc-compaction", 0.80, 0.80, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let properties = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("free".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override.
let variant = store.evaluate_multivariate_inner("gc-compaction", 0.10, 0.10, &properties);
assert_eq!(variant.unwrap(), "enabled-stage-2".to_string());
// It matches the second group as 0.50 <= 0.60 <= 0.80 and the properties are matched. Then it gets evaluated using the global percentage.
let variant = store.evaluate_multivariate_inner("gc-compaction", 0.99, 0.60, &properties);
assert_eq!(variant.unwrap(), "enabled-stage-1".to_string());
let variant = store.evaluate_multivariate_inner("gc-compaction", 0.80, 0.60, &properties);
assert_eq!(variant.unwrap(), "disabled".to_string());
// It matches the group conditions but not the group rollout percentage.
let variant = store.evaluate_multivariate_inner("gc-compaction", 1.00, 0.90, &properties);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
}
}

View File

@@ -330,11 +330,18 @@ impl AzureBlobStorage {
if let Err(DownloadError::Timeout) = &next_item {
timeout_try_cnt += 1;
if timeout_try_cnt <= 5 {
continue;
continue 'outer;
}
}
let next_item = next_item?;
let next_item = match next_item {
Ok(next_item) => next_item,
Err(e) => {
// The error is potentially retryable, so we must rewind the loop after yielding.
yield Err(e);
continue 'outer;
},
};
// Log a warning if we saw two timeouts in a row before a successful request
if timeout_try_cnt > 2 {

View File

@@ -657,7 +657,14 @@ impl RemoteStorage for S3Bucket {
res = request => Ok(res),
_ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}?;
};
if let Err(DownloadError::Timeout) = &response {
yield Err(DownloadError::Timeout);
continue 'outer;
}
let response = response?; // always yield cancellation errors and stop the stream
let response = response
.context("Failed to list S3 prefixes")

View File

@@ -299,6 +299,7 @@ pub struct PullTimelineRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
pub ignore_tombstone: Option<bool>,
}
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -14,6 +14,7 @@ use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
@@ -244,13 +245,7 @@ fn criterion_benchmark(c: &mut Criterion) {
];
let exploded_parameters = {
let mut out = Vec::new();
for io_mode in [
IoMode::Buffered,
#[cfg(target_os = "linux")]
IoMode::Direct,
#[cfg(target_os = "linux")]
IoMode::DirectRw,
] {
for io_mode in IoMode::iter() {
for param in expect.clone() {
let HandPickedParameters {
volume_mib,

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::error::Error as _;
use std::time::Duration;
use bytes::Bytes;
use detach_ancestor::AncestorDetached;
@@ -819,4 +820,25 @@ impl Client {
.await
.map(|resp| resp.status())
}
pub async fn activate_post_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
activate_timeline_timeout: Duration,
) -> Result<TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/activate_post_import?timeline_activate_timeout_ms={}",
self.mgmt_api_endpoint,
tenant_shard_id,
timeline_id,
activate_timeline_timeout.as_millis()
);
self.request(Method::PUT, uri, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -53,6 +53,11 @@ pub trait StorageControllerUpcallApi {
timeline_id: TimelineId,
status: ShardImportStatus,
) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
fn get_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> impl Future<Output = Result<Option<ShardImportStatus>, RetryForeverError>> + Send;
}
impl StorageControllerUpcallClient {
@@ -302,4 +307,39 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
self.retry_http_forever(&url, request).await
}
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
async fn get_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<Option<ShardImportStatus>, RetryForeverError> {
let url = self
.base_url
.join(format!("timeline_import_status/{}/{}", tenant_shard_id, timeline_id).as_str())
.expect("Failed to build path");
Ok(backoff::retry(
|| async {
let response = self.http_client.get(url.clone()).send().await?;
if let Err(err) = response.error_for_status_ref() {
if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) {
return Ok(None);
} else {
return Err(err);
}
}
response.json::<ShardImportStatus>().await.map(Some)
},
|_| false,
3,
u32::MAX,
"storage controller upcall",
&self.cancel,
)
.await
.ok_or(RetryForeverError::ShuttingDown)?
.expect("We retry forever, this should never be reached"))
}
}

View File

@@ -663,6 +663,7 @@ mod test {
use camino::Utf8Path;
use hex_literal::hex;
use pageserver_api::key::Key;
use pageserver_api::models::ShardImportStatus;
use pageserver_api::shard::ShardIndex;
use pageserver_api::upcall_api::ReAttachResponseTenant;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
@@ -796,6 +797,14 @@ mod test {
) -> Result<(), RetryForeverError> {
unimplemented!()
}
async fn get_timeline_import_status(
&self,
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
) -> Result<Option<ShardImportStatus>, RetryForeverError> {
unimplemented!()
}
}
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {

View File

@@ -3500,6 +3500,107 @@ async fn put_tenant_timeline_import_wal(
}.instrument(span).await
}
/// Activate a timeline after its import has completed
///
/// The endpoint is idempotent and callers are expected to retry all
/// errors until a successful response.
async fn activate_post_import_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
const DEFAULT_ACTIVATE_TIMEOUT: Duration = Duration::from_secs(1);
let activate_timeout = parse_query_param(&request, "timeline_activate_timeout_ms")?
.map(Duration::from_millis)
.unwrap_or(DEFAULT_ACTIVATE_TIMEOUT);
let span = info_span!(
"activate_post_import_handler",
tenant_id=%tenant_shard_id.tenant_id,
timeline_id=%timeline_id,
shard_id=%tenant_shard_id.shard_slug()
);
async move {
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
tenant
.finalize_importing_timeline(timeline_id)
.await
.map_err(ApiError::InternalServerError)?;
match tenant.get_timeline(timeline_id, false) {
Ok(_timeline) => {
// Timeline is already visible. Reset not required: fall through.
}
Err(GetTimelineError::NotFound { .. }) => {
// This is crude: we reset the whole tenant such that the new timeline is detected
// and activated. We can come up with something more granular in the future.
//
// Note that we only reset the tenant if required: when the timeline is
// not present in [`Tenant::timelines`].
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
state
.tenant_manager
.reset_tenant(tenant_shard_id, false, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
}
Err(GetTimelineError::ShuttingDown) => {
return Err(ApiError::ShuttingDown);
}
Err(GetTimelineError::NotActive { .. }) => {
unreachable!("Called get_timeline with active_only=false");
}
}
let timeline = tenant.get_timeline(timeline_id, false)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn)
.with_scope_timeline(&timeline);
let result =
tokio::time::timeout(activate_timeout, timeline.wait_to_become_active(&ctx)).await;
match result {
Ok(Ok(())) => {
// fallthrough
}
// Timeline reached some other state that's not active
// TODO(vlad): if the tenant is broken, return a permananet error
Ok(Err(_timeline_state)) => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Timeline activation failed"
)));
}
// Activation timed out
Err(_) => {
return Err(ApiError::Timeout("Timeline activation timed out".into()));
}
}
let timeline_info = build_timeline_info(
&timeline, false, // include_non_incremental_logical_size,
false, // force_await_initial_logical_size
&ctx,
)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, timeline_info)
}
.instrument(span)
.await
}
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
@@ -3924,5 +4025,9 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal",
|r| api_handler(r, put_tenant_timeline_import_wal),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
|r| api_handler(r, activate_post_import_handler),
)
.any(handler_404))
}

View File

@@ -1308,7 +1308,6 @@ pub(crate) enum StorageIoOperation {
Fsync,
Metadata,
SetLen,
Fallocate,
}
impl StorageIoOperation {
@@ -1324,7 +1323,6 @@ impl StorageIoOperation {
StorageIoOperation::Fsync => "fsync",
StorageIoOperation::Metadata => "metadata",
StorageIoOperation::SetLen => "set_len",
StorageIoOperation::Fallocate => "fallocate",
}
}
}

View File

@@ -50,7 +50,9 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
use crate::tenant::timeline::{
GetVectoredError, MissingKeyError, RelSizeCacheEntry, VersionedKeySpaceQuery,
};
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
@@ -470,8 +472,26 @@ impl Timeline {
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
return Ok(nblocks);
if let Some(entry) = self.get_cached_rel_size(&tag, version.get_lsn()) {
match entry {
RelSizeCacheEntry::Present(nblocks) => {
return Ok(nblocks);
}
RelSizeCacheEntry::Truncated => {
let key = rel_size_to_key(tag);
return Err(PageReconstructError::MissingKey(Box::new(
MissingKeyError {
keyspace: KeySpace::single(key..key.next()),
shard: self.get_shard_identity().number,
query: None,
original_hwm_lsn: version.get_lsn(),
ancestor_lsn: None,
read_path: None,
backtrace: None,
},
)));
}
}
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
@@ -510,8 +530,15 @@ impl Timeline {
}
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
return Ok(true);
if let Some(entry) = self.get_cached_rel_size(&tag, version.get_lsn()) {
match entry {
RelSizeCacheEntry::Present(_) => {
return Ok(true);
}
RelSizeCacheEntry::Truncated => {
return Ok(false);
}
}
}
// then check if the database was already initialized.
// get_rel_exists can be called before dbdir is created.
@@ -1330,12 +1357,12 @@ impl Timeline {
}
/// Get cached size of relation if it not updated after specified LSN
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<RelSizeCacheEntry> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
if let Some((cached_lsn, entry)) = rel_size_cache.map.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_CACHE_HITS.inc();
return Some(*nblocks);
return Some(*entry);
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
@@ -1359,11 +1386,11 @@ impl Timeline {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
*cached_lsn = (lsn, RelSizeCacheEntry::Present(nblocks));
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
entry.insert((lsn, RelSizeCacheEntry::Present(nblocks)));
RELSIZE_CACHE_ENTRIES.inc();
}
}
@@ -1372,15 +1399,23 @@ impl Timeline {
/// Store cached relation size
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
if rel_size_cache
.map
.insert(tag, (lsn, RelSizeCacheEntry::Present(nblocks)))
.is_none()
{
RELSIZE_CACHE_ENTRIES.inc();
}
}
/// Remove cached relation size
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
pub fn remove_cached_rel_size(&self, tag: RelTag, lsn: Lsn) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.remove(tag).is_some() {
if rel_size_cache
.map
.insert(tag, (lsn, RelSizeCacheEntry::Truncated))
.is_some()
{
RELSIZE_CACHE_ENTRIES.dec();
}
}
@@ -1585,7 +1620,9 @@ impl DatadirModification<'_> {
// check the cache too. This is because eagerly checking the cache results in
// less work overall and 10% better performance. It's more work on cache miss
// but cache miss is rare.
if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
if let Some(RelSizeCacheEntry::Present(nblocks)) =
self.tline.get_cached_rel_size(&rel, self.get_lsn())
{
Ok(nblocks)
} else if !self
.tline
@@ -2172,7 +2209,7 @@ impl DatadirModification<'_> {
self.pending_nblocks -= old_size as i64;
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
self.tline.remove_cached_rel_size(rel_tag, self.lsn);
// Delete size entry, as well as all blocks; this is currently a no-op because we haven't implemented tombstones in storage.
self.delete(rel_key_range(rel_tag));

View File

@@ -50,6 +50,7 @@ use remote_timeline_client::{
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::import_pgdata::ImportingTimeline;
use timeline::offload::{OffloadError, offload_timeline};
use timeline::{
CompactFlags, CompactOptions, CompactionError, PreviousHeatmap, ShutdownMode, import_pgdata,
@@ -284,6 +285,19 @@ pub struct TenantShard {
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
/// Tracks the timelines that are currently importing into this tenant shard.
///
/// Note that importing timelines are also present in [`Self::timelines_creating`].
/// Keep this in mind when ordering lock acquisition.
///
/// Lifetime:
/// * An imported timeline is created while scanning the bucket on tenant attach
/// if the index part contains an `import_pgdata` entry and said field marks the import
/// as in progress.
/// * Imported timelines are removed when the storage controller calls the post timeline
/// import activation endpoint.
timelines_importing: std::sync::Mutex<HashMap<TimelineId, ImportingTimeline>>,
/// The last tenant manifest known to be in remote storage. None if the manifest has not yet
/// been either downloaded or uploaded. Always Some after tenant attach.
///
@@ -923,19 +937,10 @@ enum StartCreatingTimelineResult {
#[allow(clippy::large_enum_variant, reason = "TODO")]
enum TimelineInitAndSyncResult {
ReadyToActivate(Arc<Timeline>),
ReadyToActivate,
NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata),
}
impl TimelineInitAndSyncResult {
fn ready_to_activate(self) -> Option<Arc<Timeline>> {
match self {
Self::ReadyToActivate(timeline) => Some(timeline),
_ => None,
}
}
}
#[must_use]
struct TimelineInitAndSyncNeedsSpawnImportPgdata {
timeline: Arc<Timeline>,
@@ -1012,10 +1017,6 @@ enum CreateTimelineCause {
enum LoadTimelineCause {
Attach,
Unoffload,
ImportPgdata {
create_guard: TimelineCreateGuard,
activate: ActivateTimelineArgs,
},
}
#[derive(thiserror::Error, Debug)]
@@ -1097,7 +1098,7 @@ impl TenantShard {
self: &Arc<Self>,
timeline_id: TimelineId,
resources: TimelineResources,
mut index_part: IndexPart,
index_part: IndexPart,
metadata: TimelineMetadata,
previous_heatmap: Option<PreviousHeatmap>,
ancestor: Option<Arc<Timeline>>,
@@ -1106,7 +1107,7 @@ impl TenantShard {
) -> anyhow::Result<TimelineInitAndSyncResult> {
let tenant_id = self.tenant_shard_id;
let import_pgdata = index_part.import_pgdata.take();
let import_pgdata = index_part.import_pgdata.clone();
let idempotency = match &import_pgdata {
Some(import_pgdata) => {
CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata {
@@ -1127,7 +1128,7 @@ impl TenantShard {
}
};
let (timeline, timeline_ctx) = self.create_timeline_struct(
let (timeline, _timeline_ctx) = self.create_timeline_struct(
timeline_id,
&metadata,
previous_heatmap,
@@ -1197,14 +1198,6 @@ impl TenantShard {
match import_pgdata {
Some(import_pgdata) if !import_pgdata.is_done() => {
match cause {
LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (),
LoadTimelineCause::ImportPgdata { .. } => {
unreachable!(
"ImportPgdata should not be reloading timeline import is done and persisted as such in s3"
)
}
}
let mut guard = self.timelines_creating.lock().unwrap();
if !guard.insert(timeline_id) {
// We should never try and load the same timeline twice during startup
@@ -1260,26 +1253,7 @@ impl TenantShard {
"Timeline has no ancestor and no layer files"
);
match cause {
LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (),
LoadTimelineCause::ImportPgdata {
create_guard,
activate,
} => {
// TODO: see the comment in the task code above how I'm not so certain
// it is safe to activate here because of concurrent shutdowns.
match activate {
ActivateTimelineArgs::Yes { broker_client } => {
info!("activating timeline after reload from pgdata import task");
timeline.activate(self.clone(), broker_client, None, &timeline_ctx);
}
ActivateTimelineArgs::No => (),
}
drop(create_guard);
}
}
Ok(TimelineInitAndSyncResult::ReadyToActivate(timeline))
Ok(TimelineInitAndSyncResult::ReadyToActivate)
}
}
}
@@ -1768,7 +1742,7 @@ impl TenantShard {
})?;
match effect {
TimelineInitAndSyncResult::ReadyToActivate(_) => {
TimelineInitAndSyncResult::ReadyToActivate => {
// activation happens later, on Tenant::activate
}
TimelineInitAndSyncResult::NeedsSpawnImportPgdata(
@@ -1778,13 +1752,24 @@ impl TenantShard {
guard,
},
) => {
tokio::task::spawn(self.clone().create_timeline_import_pgdata_task(
timeline,
import_pgdata,
ActivateTimelineArgs::No,
guard,
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let timeline_id = timeline.timeline_id;
let import_task_handle =
tokio::task::spawn(self.clone().create_timeline_import_pgdata_task(
timeline.clone(),
import_pgdata,
guard,
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let prev = self.timelines_importing.lock().unwrap().insert(
timeline_id,
ImportingTimeline {
timeline: timeline.clone(),
import_task_handle,
},
);
assert!(prev.is_none());
}
}
}
@@ -2678,14 +2663,7 @@ impl TenantShard {
.await?
}
CreateTimelineParams::ImportPgdata(params) => {
self.create_timeline_import_pgdata(
params,
ActivateTimelineArgs::Yes {
broker_client: broker_client.clone(),
},
ctx,
)
.await?
self.create_timeline_import_pgdata(params, ctx).await?
}
};
@@ -2759,7 +2737,6 @@ impl TenantShard {
async fn create_timeline_import_pgdata(
self: &Arc<Self>,
params: CreateTimelineParamsImportPgdata,
activate: ActivateTimelineArgs,
ctx: &RequestContext,
) -> Result<CreateTimelineResult, CreateTimelineError> {
let CreateTimelineParamsImportPgdata {
@@ -2840,24 +2817,71 @@ impl TenantShard {
let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself();
tokio::spawn(self.clone().create_timeline_import_pgdata_task(
let import_task_handle = tokio::spawn(self.clone().create_timeline_import_pgdata_task(
timeline.clone(),
index_part,
activate,
timeline_create_guard,
timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let prev = self.timelines_importing.lock().unwrap().insert(
timeline.timeline_id,
ImportingTimeline {
timeline: timeline.clone(),
import_task_handle,
},
);
// Idempotency is enforced higher up the stack
assert!(prev.is_none());
// NB: the timeline doesn't exist in self.timelines at this point
Ok(CreateTimelineResult::ImportSpawned(timeline))
}
/// Finalize the import of a timeline on this shard by marking it complete in
/// the index part. If the import task hasn't finished yet, returns an error.
///
/// This method is idempotent. If the import was finalized once, the next call
/// will be a no-op.
pub(crate) async fn finalize_importing_timeline(
&self,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
let timeline = {
let locked = self.timelines_importing.lock().unwrap();
match locked.get(&timeline_id) {
Some(importing_timeline) => {
if !importing_timeline.import_task_handle.is_finished() {
return Err(anyhow::anyhow!("Import task not done yet"));
}
importing_timeline.timeline.clone()
}
None => {
return Ok(());
}
}
};
timeline
.remote_client
.schedule_index_upload_for_import_pgdata_finalize()?;
timeline.remote_client.wait_completion().await?;
self.timelines_importing
.lock()
.unwrap()
.remove(&timeline_id);
Ok(())
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))]
async fn create_timeline_import_pgdata_task(
self: Arc<TenantShard>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
timeline_create_guard: TimelineCreateGuard,
ctx: RequestContext,
) {
@@ -2869,7 +2893,6 @@ impl TenantShard {
.create_timeline_import_pgdata_task_impl(
timeline,
index_part,
activate,
timeline_create_guard,
ctx,
)
@@ -2885,60 +2908,15 @@ impl TenantShard {
self: Arc<TenantShard>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
timeline_create_guard: TimelineCreateGuard,
_timeline_create_guard: TimelineCreateGuard,
ctx: RequestContext,
) -> Result<(), anyhow::Error> {
info!("importing pgdata");
let ctx = ctx.with_scope_timeline(&timeline);
import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone())
.await
.context("import")?;
info!("import done");
//
// Reload timeline from remote.
// This proves that the remote state is attachable, and it reuses the code.
//
// TODO: think about whether this is safe to do with concurrent TenantShard::shutdown.
// timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit.
// But our activate() call might launch new background tasks after TenantShard::shutdown
// already went past shutting down the TenantShard::timelines, which this timeline here is no part of.
// I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting
// down while bootstrapping/branching + activating), but, the race condition is much more likely
// to manifest because of the long runtime of this import task.
// in theory this shouldn't even .await anything except for coop yield
info!("shutting down timeline");
timeline.shutdown(ShutdownMode::Hard).await;
info!("timeline shut down, reloading from remote");
// TODO: we can't do the following check because create_timeline_import_pgdata must return an Arc<Timeline>
// let Some(timeline) = Arc::into_inner(timeline) else {
// anyhow::bail!("implementation error: timeline that we shut down was still referenced from somewhere");
// };
let timeline_id = timeline.timeline_id;
// load from object storage like TenantShard::attach does
let resources = self.build_timeline_resources(timeline_id);
let index_part = resources
.remote_client
.download_index_file(&self.cancel)
.await?;
let index_part = match index_part {
MaybeDeletedIndexPart::Deleted(_) => {
// likely concurrent delete call, cplane should prevent this
anyhow::bail!(
"index part says deleted but we are not done creating yet, this should not happen but"
)
}
MaybeDeletedIndexPart::IndexPart(p) => p,
};
let metadata = index_part.metadata.clone();
self
.load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{
create_guard: timeline_create_guard, activate, }, &ctx)
.await?
.ready_to_activate()
.context("implementation error: reloaded timeline still needs import after import reported success")?;
info!("import done - waiting for activation");
anyhow::Ok(())
}
@@ -3475,6 +3453,14 @@ impl TenantShard {
timeline.defuse_for_tenant_drop();
});
}
{
let mut timelines_importing = self.timelines_importing.lock().unwrap();
timelines_importing
.drain()
.for_each(|(_timeline_id, importing_timeline)| {
importing_timeline.shutdown();
});
}
// test_long_timeline_create_then_tenant_delete is leaning on this message
tracing::info!("Waiting for timelines...");
while let Some(res) = js.join_next().await {
@@ -3949,13 +3935,6 @@ where
Ok(result)
}
enum ActivateTimelineArgs {
Yes {
broker_client: storage_broker::BrokerClientChannel,
},
No,
}
impl TenantShard {
pub fn tenant_specific_overrides(&self) -> pageserver_api::models::TenantConfig {
self.tenant_conf.load().tenant_conf.clone()
@@ -4322,6 +4301,7 @@ impl TenantShard {
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
timelines_offloaded: Mutex::new(HashMap::new()),
timelines_importing: Mutex::new(HashMap::new()),
remote_tenant_manifest: Default::default(),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,

View File

@@ -88,10 +88,6 @@ impl EphemeralFile {
gate.enter()?,
);
file.fallocate(0, 1 * 1024 * 1024 * 1024, ctx)
.await
.unwrap();
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
Ok(EphemeralFile {

View File

@@ -949,6 +949,35 @@ impl RemoteTimelineClient {
Ok(())
}
/// If the `import_pgdata` field marks the timeline as having an import in progress,
/// launch an index-file upload operation that transitions it to done in the background
pub(crate) fn schedule_index_upload_for_import_pgdata_finalize(
self: &Arc<Self>,
) -> anyhow::Result<()> {
use import_pgdata::index_part_format;
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let to_update = match &upload_queue.dirty.import_pgdata {
Some(import) if !import.is_done() => Some(import),
Some(_) | None => None,
};
if let Some(old) = to_update {
let new =
index_part_format::Root::V1(index_part_format::V1::Done(index_part_format::Done {
idempotency_key: old.idempotency_key().clone(),
started_at: *old.started_at(),
finished_at: chrono::Utc::now().naive_utc(),
}));
upload_queue.dirty.import_pgdata = Some(new);
self.schedule_index_upload(upload_queue);
}
Ok(())
}
/// Launch an index-file upload operation in the background, setting `gc_compaction_state` field.
pub(crate) fn schedule_index_upload_for_gc_compaction_state_update(
self: &Arc<Self>,

View File

@@ -76,8 +76,6 @@ pub async fn download_layer_file<'a>(
layer_metadata.generation,
);
let expected = layer_metadata.file_size;
let (bytes_amount, temp_file) = download_retry(
|| async {
// TempVirtualFile requires us to never reuse a filename while an old
@@ -105,16 +103,6 @@ pub async fn download_layer_file<'a>(
.map_err(DownloadError::Other)?,
gate.enter().map_err(|_| DownloadError::Cancelled)?,
);
{
temp_file.fallocate(
0,
layer_metadata.file_size.next_multiple_of(
64 * 1024 /* TODO this is the max roundtup size by the buffered writer set_len_then_truncate */
),
ctx,
).await.unwrap();
};
download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
},
&format!("download {remote_path:?}"),
@@ -122,6 +110,7 @@ pub async fn download_layer_file<'a>(
)
.await?;
let expected = layer_metadata.file_size;
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",

View File

@@ -441,10 +441,6 @@ impl DeltaLayerWriterInner {
gate.enter()?,
);
file.fallocate(0, 1 * 1024 * 1024 * 1024, ctx)
.await
.unwrap();
// Start at PAGE_SZ, make room for the header block
let blob_writer = BlobWriter::new(
file,

View File

@@ -799,10 +799,6 @@ impl ImageLayerWriterInner {
gate.enter()?,
);
file.fallocate(0, 1 * 1024 * 1024 * 1024, ctx)
.await
.unwrap();
// Start at `PAGE_SZ` to make room for the header block.
let blob_writer = BlobWriter::new(
file,

View File

@@ -204,7 +204,13 @@ pub struct TimelineResources {
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
pub(crate) struct RelSizeCache {
pub(crate) complete_as_of: Lsn,
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
pub(crate) map: HashMap<RelTag, (Lsn, RelSizeCacheEntry)>,
}
#[derive(Debug, Copy, Clone)]
pub enum RelSizeCacheEntry {
Present(BlockNumber),
Truncated,
}
pub struct Timeline {
@@ -690,15 +696,15 @@ impl std::fmt::Display for ReadPath {
#[derive(thiserror::Error)]
pub struct MissingKeyError {
keyspace: KeySpace,
shard: ShardNumber,
query: Option<VersionedKeySpaceQuery>,
pub keyspace: KeySpace,
pub shard: ShardNumber,
pub query: Option<VersionedKeySpaceQuery>,
// This is largest request LSN from the get page request batch
original_hwm_lsn: Lsn,
ancestor_lsn: Option<Lsn>,
pub original_hwm_lsn: Lsn,
pub ancestor_lsn: Option<Lsn>,
/// Debug information about the read path if there's an error
read_path: Option<ReadPath>,
backtrace: Option<std::backtrace::Backtrace>,
pub read_path: Option<ReadPath>,
pub backtrace: Option<std::backtrace::Backtrace>,
}
impl MissingKeyError {
@@ -2127,22 +2133,14 @@ impl Timeline {
debug_assert_current_span_has_tenant_and_timeline_id();
// Regardless of whether we're going to try_freeze_and_flush
// or not, stop ingesting any more data. Walreceiver only provides
// cancellation but no "wait until gone", because it uses the Timeline::gate.
// So, only after the self.gate.close() below will we know for sure that
// no walreceiver tasks are left.
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
// data during the call to `self.freeze_and_flush()` below.
// That's not ideal, but, we don't have the concept of a ChildGuard,
// which is what we'd need to properly model early shutdown of the walreceiver
// task sub-tree before the other Timeline task sub-trees.
// or not, stop ingesting any more data.
let walreceiver = self.walreceiver.lock().unwrap().take();
tracing::debug!(
is_some = walreceiver.is_some(),
"Waiting for WalReceiverManager..."
);
if let Some(walreceiver) = walreceiver {
walreceiver.cancel();
walreceiver.shutdown().await;
}
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();

View File

@@ -1277,6 +1277,8 @@ impl Timeline {
return Ok(CompactionOutcome::YieldForL0);
}
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
// 2. Repartition and create image layers if necessary
match self
.repartition(
@@ -1287,7 +1289,7 @@ impl Timeline {
)
.await
{
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::from(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
@@ -1341,6 +1343,10 @@ impl Timeline {
}
}
Ok(_) => {
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
}
// Suppress errors when cancelled.
Err(_) if self.cancel.is_cancelled() => {}
Err(err) if err.is_cancel() => {}
@@ -3429,6 +3435,7 @@ impl Timeline {
// Step 2: Produce images+deltas.
let mut accumulated_values = Vec::new();
let mut accumulated_values_estimated_size = 0;
let mut last_key: Option<Key> = None;
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
@@ -3605,7 +3612,18 @@ impl Timeline {
if last_key.is_none() {
last_key = Some(key);
}
accumulated_values_estimated_size += val.estimated_size();
accumulated_values.push((key, lsn, val));
// Accumulated values should never exceed 512MB.
if accumulated_values_estimated_size >= 1024 * 1024 * 512 {
return Err(CompactionError::Other(anyhow!(
"too many values for a single key: {} for key {}, {} items",
accumulated_values_estimated_size,
key,
accumulated_values.len()
)));
}
} else {
let last_key: &mut Key = last_key.as_mut().unwrap();
stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
@@ -3638,6 +3656,7 @@ impl Timeline {
.map_err(CompactionError::Other)?;
accumulated_values.clear();
*last_key = key;
accumulated_values_estimated_size = val.estimated_size();
accumulated_values.push((key, lsn, val));
}
}

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::{Context, bail};
use pageserver_api::models::ShardImportStatus;
use remote_storage::RemotePath;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::lsn::Lsn;
@@ -17,6 +18,17 @@ mod importbucket_client;
mod importbucket_format;
pub(crate) mod index_part_format;
pub(crate) struct ImportingTimeline {
pub import_task_handle: JoinHandle<()>,
pub timeline: Arc<Timeline>,
}
impl ImportingTimeline {
pub(crate) fn shutdown(self) {
self.import_task_handle.abort();
}
}
pub async fn doit(
timeline: &Arc<Timeline>,
index_part: index_part_format::Root,
@@ -26,173 +38,161 @@ pub async fn doit(
let index_part_format::Root::V1(v1) = index_part;
let index_part_format::InProgress {
location,
idempotency_key,
started_at,
idempotency_key: _,
started_at: _,
} = match v1 {
index_part_format::V1::Done(_) => return Ok(()),
index_part_format::V1::InProgress(in_progress) => in_progress,
};
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
let status_prefix = RemotePath::from_string("status").unwrap();
let shard_status = storcon_client
.get_timeline_import_status(timeline.tenant_shard_id, timeline.timeline_id)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
//
// See if shard is done.
// TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing.
//
let shard_status_key =
status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug()));
let shard_status: Option<importbucket_format::ShardStatus> =
storage.get_json(&shard_status_key).await?;
info!(?shard_status, "peeking shard status");
if shard_status.map(|st| st.done).unwrap_or(false) {
info!("shard status indicates that the shard is done, skipping import");
} else {
// TODO: checkpoint the progress into the IndexPart instead of restarting
// from the beginning.
match shard_status {
None | Some(ShardImportStatus::InProgress) => {
// TODO: checkpoint the progress into the IndexPart instead of restarting
// from the beginning.
//
// Wipe the slate clean - the flow does not allow resuming.
// We can implement resuming in the future by checkpointing the progress into the IndexPart.
//
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it
);
let all_layers_keys = guard.all_persistent_layers();
let all_layers: Vec<_> = all_layers_keys
.iter()
.map(|key| guard.get_from_key(key))
.collect();
let open = guard.open_mut().context("open_mut")?;
//
// Wipe the slate clean - the flow does not allow resuming.
// We can implement resuming in the future by checkpointing the progress into the IndexPart.
//
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it
);
let all_layers_keys = guard.all_persistent_layers();
let all_layers: Vec<_> = all_layers_keys
.iter()
.map(|key| guard.get_from_key(key))
.collect();
let open = guard.open_mut().context("open_mut")?;
timeline.remote_client.schedule_gc_update(&all_layers)?;
open.finish_gc_timeline(&all_layers);
}
//
// Wait for pgdata to finish uploading
//
info!("wait for pgdata to reach status 'done'");
let pgdata_status_key = status_prefix.join("pgdata");
loop {
let res = async {
let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
.get_json(&pgdata_status_key)
.await
.context("get pgdata status")?;
info!(?pgdata_status, "peeking pgdata status");
if pgdata_status.map(|st| st.done).unwrap_or(false) {
Ok(())
} else {
Err(anyhow::anyhow!("pgdata not done yet"))
}
timeline.remote_client.schedule_gc_update(&all_layers)?;
open.finish_gc_timeline(&all_layers);
}
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefinitely waiting for pgdata to finish");
if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
//
// Wait for pgdata to finish uploading
//
info!("wait for pgdata to reach status 'done'");
let storage =
importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
let status_prefix = RemotePath::from_string("status").unwrap();
let pgdata_status_key = status_prefix.join("pgdata");
loop {
let res = async {
let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
.get_json(&pgdata_status_key)
.await
.context("get pgdata status")?;
info!(?pgdata_status, "peeking pgdata status");
if pgdata_status.map(|st| st.done).unwrap_or(false) {
Ok(())
} else {
Err(anyhow::anyhow!("pgdata not done yet"))
}
}
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(?err, "indefinitely waiting for pgdata to finish");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
cancel.cancelled(),
)
.await
.is_ok()
{
bail!("cancelled while waiting for pgdata");
{
bail!("cancelled while waiting for pgdata");
}
}
}
}
}
//
// Do the import
//
info!("do the import");
let control_file = storage.get_control_file().await?;
let base_lsn = control_file.base_lsn();
//
// Do the import
//
info!("do the import");
let control_file = storage.get_control_file().await?;
let base_lsn = control_file.base_lsn();
info!("update TimelineMetadata based on LSNs from control file");
{
let pg_version = control_file.pg_version();
let _ctx: &RequestContext = ctx;
async move {
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
let prev_record_lsn = base_lsn;
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
Some(prev_record_lsn),
None, // no ancestor
Lsn(0), // no ancestor lsn
base_lsn, // latest_gc_cutoff_lsn
base_lsn, // initdb_lsn
pg_version,
);
info!("update TimelineMetadata based on LSNs from control file");
{
let pg_version = control_file.pg_version();
let _ctx: &RequestContext = ctx;
async move {
// FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
// checkpoint record, and prev_record_lsn should point to its beginning.
// We should read the real end of the record from the WAL, but here we
// just fake it.
let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
let prev_record_lsn = base_lsn;
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
Some(prev_record_lsn),
None, // no ancestor
Lsn(0), // no ancestor lsn
base_lsn, // latest_gc_cutoff_lsn
base_lsn, // initdb_lsn
pg_version,
);
let _start_lsn = disk_consistent_lsn + 1;
let _start_lsn = disk_consistent_lsn + 1;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline.remote_client.wait_completion().await?;
timeline.remote_client.wait_completion().await?;
anyhow::Ok(())
anyhow::Ok(())
}
}
.await?;
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
// Note that we do not mark the import complete in the index part now.
// This happens in [`Tenant::finalize_importing_timeline`] in response
// to the storage controller calling
// `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| {
anyhow::anyhow!("Shut down while putting timeline import status")
})?;
}
Some(ShardImportStatus::Error(err)) => {
info!(
"shard status indicates that the shard is done (error), skipping import {}",
err
);
}
Some(ShardImportStatus::Done) => {
info!("shard status indicates that the shard is done (success), skipping import");
}
.await?;
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
//
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
storage
.put_json(
&shard_status_key,
&importbucket_format::ShardStatus { done: true },
)
.await
.context("put shard status")?;
}
//
// Mark as done in index_part.
// This makes subsequent timeline loads enter the normal load code path
// instead of spawning the import task and calling this here function.
//
info!("mark import as complete in index part");
timeline
.remote_client
.schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1(
index_part_format::V1::Done(index_part_format::Done {
idempotency_key,
started_at,
finished_at: chrono::Utc::now().naive_utc(),
}),
)))?;
timeline.remote_client.wait_completion().await?;
Ok(())
}

View File

@@ -53,6 +53,7 @@ use tokio_stream::StreamExt;
use tracing::{debug, instrument};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use utils::pausable_failpoint;
use super::Timeline;
use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
@@ -79,6 +80,9 @@ pub async fn run(
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
pausable_failpoint!("import-timeline-pre-execute-pausable");
plan.execute(timeline, import_config, ctx).await
}

View File

@@ -190,31 +190,6 @@ impl RemoteStorageWrapper {
Ok(Some(res))
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn put_json<T>(&self, path: &RemotePath, value: &T) -> anyhow::Result<()>
where
T: serde::Serialize,
{
let buf = serde_json::to_vec(value)?;
let bytes = Bytes::from(buf);
utils::backoff::retry(
|| async {
let size = bytes.len();
let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
self.storage
.upload_storage_object(bytes, size, path, &self.cancel)
.await
},
remote_storage::TimeoutOrCancel::caused_by_cancel,
1,
u32::MAX,
&format!("put json {path}"),
&self.cancel,
)
.await
.expect("practically infinite retries")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_range(
&self,

View File

@@ -5,9 +5,3 @@ pub struct PgdataStatus {
pub done: bool,
// TODO: remaining fields
}
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct ShardStatus {
pub done: bool,
// TODO: remaining fields
}

View File

@@ -64,4 +64,12 @@ impl Root {
},
}
}
pub fn started_at(&self) -> &chrono::NaiveDateTime {
match self {
Root::V1(v1) => match v1 {
V1::InProgress(in_progress) => &in_progress.started_at,
V1::Done(done) => &done.started_at,
},
}
}
}

View File

@@ -63,6 +63,7 @@ pub struct WalReceiver {
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
cancel: CancellationToken,
task: tokio::task::JoinHandle<()>,
}
impl WalReceiver {
@@ -79,7 +80,7 @@ impl WalReceiver {
let loop_status = Arc::new(std::sync::RwLock::new(None));
let manager_status = Arc::clone(&loop_status);
let cancel = timeline.cancel.child_token();
WALRECEIVER_RUNTIME.spawn({
let task = WALRECEIVER_RUNTIME.spawn({
let cancel = cancel.clone();
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -120,14 +121,25 @@ impl WalReceiver {
Self {
manager_status,
cancel,
task,
}
}
#[instrument(skip_all, level = tracing::Level::DEBUG)]
pub fn cancel(&self) {
pub async fn shutdown(self) {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("cancelling walreceiver tasks");
self.cancel.cancel();
match self.task.await {
Ok(()) => debug!("Shutdown success"),
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// already logged by panic hook
}
Err(je) => {
error!("shutdown walreceiver task join error: {je}")
}
}
}
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {

View File

@@ -74,6 +74,8 @@ pub struct VirtualFile {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
///
/// Insensitive to `virtual_file_io_mode` setting.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
ctx: &RequestContext,
@@ -95,31 +97,20 @@ impl VirtualFile {
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
}
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
mut open_options: OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let mode = get_io_mode();
let set_o_direct = match (mode, open_options.is_write()) {
let direct = match (mode, open_options.is_write()) {
(IoMode::Buffered, _) => false,
#[cfg(target_os = "linux")]
(IoMode::Direct, false) => true,
#[cfg(target_os = "linux")]
(IoMode::Direct, true) => false,
#[cfg(target_os = "linux")]
(IoMode::DirectRw, _) => true,
};
if set_o_direct {
#[cfg(target_os = "linux")]
{
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
}
#[cfg(not(target_os = "linux"))]
unreachable!(
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
);
}
open_options = open_options.direct(direct);
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile { inner, _mode: mode })
}
@@ -154,15 +145,6 @@ impl VirtualFile {
self.inner.set_len(len, ctx).await
}
pub async fn fallocate(
&self,
offset: u64,
size: u64,
ctx: &RequestContext,
) -> Result<(), Error> {
self.inner.fallocate(offset, size, ctx).await
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
self.inner.metadata().await
}
@@ -641,18 +623,6 @@ impl VirtualFileInner {
})
}
pub async fn fallocate(
&self,
offset: u64,
size: u64,
_ctx: &RequestContext,
) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fallocate, |file_guard| {
let (_file_guard, res) = io_engine::get().fallocate(file_guard, offset, size).await;
res.maybe_fatal_err("fallocate") // TODO haven't thought about this
})
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
@@ -812,6 +782,12 @@ impl VirtualFileInner {
where
Buf: tokio_epoll_uring::IoBufMut + Send,
{
self.validate_direct_io(
Slice::stable_ptr(&buf).addr(),
Slice::bytes_total(&buf),
offset,
);
let file_guard = match self
.lock_file()
.await
@@ -837,6 +813,8 @@ impl VirtualFileInner {
offset: u64,
ctx: &RequestContext,
) -> (FullSlice<B>, Result<usize, Error>) {
self.validate_direct_io(buf.as_ptr().addr(), buf.len(), offset);
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
@@ -851,6 +829,64 @@ impl VirtualFileInner {
(buf, result)
})
}
/// Validate all reads and writes to adhere to the O_DIRECT requirements of our production systems.
///
/// Validating it iin userspace sets a consistent bar, independent of what actual OS/filesystem/block device is in use.
fn validate_direct_io(&self, addr: usize, size: usize, offset: u64) {
// TODO: eventually enable validation in the builds we use in real environments like staging, preprod, and prod.
if !(cfg!(feature = "testing") || cfg!(test)) {
return;
}
if !self.open_options.is_direct() {
return;
}
// Validate buffer memory alignment.
//
// What practically matters as of Linux 6.1 is bdev_dma_alignment()
// which is practically between 512 and 4096.
// On our production systems, the value is 512.
// The IoBuffer/IoBufferMut hard-code that value.
//
// Because the alloctor might return _more_ aligned addresses than requested,
// there is a chance that testing would not catch violations of a runtime requirement stricter than 512.
{
let requirement = 512;
let remainder = addr % requirement;
assert!(
remainder == 0,
"Direct I/O buffer must be aligned: buffer_addr=0x{addr:x} % 0x{requirement:x} = 0x{remainder:x}"
);
}
// Validate offset alignment.
//
// We hard-code 512 throughout the code base.
// So enforce just that and not anything more restrictive.
// Even the shallowest testing will expose more restrictive requirements if those ever arise.
{
let requirement = 512;
let remainder = offset % requirement;
assert!(
remainder == 0,
"Direct I/O offset must be aligned: offset=0x{offset:x} % 0x{requirement:x} = 0x{remainder:x}"
);
}
// Validate buffer size multiple requirement.
//
// The requirement in Linux 6.1 is bdev_logical_block_size().
// On our production systems, that is 512.
{
let requirement = 512;
let remainder = size % requirement;
assert!(
remainder == 0,
"Direct I/O buffer size must be a multiple of {requirement}: size=0x{size:x} % 0x{requirement:x} = 0x{remainder:x}"
);
}
}
}
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
@@ -1239,7 +1275,6 @@ mod tests {
use std::sync::Arc;
use owned_buffers_io::io_buf_ext::IoBufExt;
use owned_buffers_io::slice::SliceMutExt;
use rand::seq::SliceRandom;
use rand::{Rng, thread_rng};
@@ -1247,162 +1282,38 @@ mod tests {
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
enum MaybeVirtualFile {
VirtualFile(VirtualFile),
File(File),
}
impl From<VirtualFile> for MaybeVirtualFile {
fn from(vf: VirtualFile) -> Self {
MaybeVirtualFile::VirtualFile(vf)
}
}
impl MaybeVirtualFile {
async fn read_exact_at(
&self,
mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
offset: u64,
ctx: &RequestContext,
) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
MaybeVirtualFile::File(file) => {
let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
file.read_exact_at(rust_slice, offset).map(|()| slice)
}
}
}
async fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
res
}
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
}
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(
&mut self,
pos: u64,
len: usize,
ctx: &RequestContext,
) -> Result<String, Error> {
let slice = IoBufferMut::with_capacity(len).slice_full();
assert_eq!(slice.bytes_total(), len);
let slice = self.read_exact_at(slice, pos, ctx).await?;
let buf = slice.into_inner();
assert_eq!(buf.len(), len);
Ok(String::from_utf8(buf.to_vec()).unwrap())
}
}
#[tokio::test]
async fn test_virtual_files() -> anyhow::Result<()> {
// The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them,
// but this allows us to verify that the operations return the same
// results with VirtualFiles as with native Files. (Except that with
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
struct A;
impl Adapter for A {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
test_files::<A>("virtual_files").await
}
#[tokio::test]
async fn test_physical_files() -> anyhow::Result<()> {
struct B;
impl Adapter for B {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
_ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
Ok(MaybeVirtualFile::File({
let owned_fd = opts.open(path.as_std_path()).await?;
File::from(owned_fd)
}))
}
}
test_files::<B>("physical_files").await
}
/// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
/// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
/// in trait which benefits from the new lifetime capture rules already.
trait Adapter {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error>;
}
async fn test_files<A>(testname: &str) -> anyhow::Result<()>
where
A: Adapter,
{
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
let testdir = crate::config::PageServerConf::test_repo_dir("test_virtual_files");
std::fs::create_dir_all(&testdir)?;
let zeropad512 = |content: &[u8]| {
let mut buf = IoBufferMut::with_capacity_zeroed(512);
buf[..content.len()].copy_from_slice(content);
buf.freeze().slice_len()
};
let path_a = testdir.join("file_a");
let mut file_a = A::open(
let file_a = VirtualFile::open_with_options_v2(
path_a.clone(),
OpenOptions::new()
.read(true)
.write(true)
// set create & truncate flags to ensure when we trigger a reopen later in this test,
// the reopen_options must have masked out those flags; if they don't, then
// the after reopen we will fail to read the `content_a` that we write here.
.create(true)
.truncate(true)
.to_owned(),
.truncate(true),
&ctx,
)
.await?;
let (_, res) = file_a.write_all_at(zeropad512(b"content_a"), 0, &ctx).await;
res?;
file_a
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
.await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
.await
.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
let mut file_b = A::open(
let file_b = VirtualFile::open_with_options_v2(
path_b.clone(),
OpenOptions::new()
.read(true)
@@ -1412,37 +1323,44 @@ mod tests {
&ctx,
)
.await?;
file_b
.write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
.await?;
file_b
.write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
.await?;
let (_, res) = file_b.write_all_at(zeropad512(b"content_b"), 0, &ctx).await;
res?;
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
let assert_first_512_eq = async |vfile: &VirtualFile, expect: &[u8]| {
let buf = vfile
.read_exact_at(IoBufferMut::with_capacity_zeroed(512).slice_full(), 0, &ctx)
.await
.unwrap();
assert_eq!(&buf[..], &zeropad512(expect)[..]);
};
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
// Open a lot of file descriptors / VirtualFile instances.
// Enough to cause some evictions in the fd cache.
let mut vfiles = Vec::new();
let mut file_b_dupes = Vec::new();
for _ in 0..100 {
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
vfiles.push(vfile);
let vfile = VirtualFile::open_with_options_v2(
path_b.clone(),
OpenOptions::new().read(true),
&ctx,
)
.await?;
assert_first_512_eq(&vfile, b"content_b").await;
file_b_dupes.push(vfile);
}
// make sure we opened enough files to definitely cause evictions.
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
assert!(file_b_dupes.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again.
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
// from it again. The VirtualFile reopens the file internally.
assert_first_512_eq(&file_a, b"content_a").await;
// Check that all the other FDs still work too. Use them in random order for
// good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
file_b_dupes.as_mut_slice().shuffle(&mut thread_rng());
for vfile in file_b_dupes.iter_mut() {
assert_first_512_eq(vfile, b"content_b").await;
}
Ok(())
@@ -1473,7 +1391,7 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFileInner::open_with_options(
let f = VirtualFile::open_with_options_v2(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
@@ -1518,8 +1436,6 @@ mod tests {
#[tokio::test]
async fn test_atomic_overwrite_basic() {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
std::fs::create_dir_all(&testdir).unwrap();
@@ -1529,26 +1445,22 @@ mod tests {
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
let post = std::fs::read_to_string(&path).unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
let post = std::fs::read_to_string(&path).unwrap();
assert_eq!(post, "bar");
assert!(!tmp_path.exists());
drop(file);
}
#[tokio::test]
async fn test_atomic_overwrite_preexisting_tmp() {
let ctx =
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
let testdir =
crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
std::fs::create_dir_all(&testdir).unwrap();
@@ -1563,10 +1475,8 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
let post = std::fs::read_to_string(&path).unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
}
}

View File

@@ -109,7 +109,6 @@ pub(crate) fn get() -> IoEngine {
}
}
use std::os::fd::AsRawFd;
use std::os::unix::prelude::FileExt;
use std::sync::atomic::{AtomicU8, Ordering};
#[cfg(target_os = "linux")]
@@ -241,7 +240,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
// TODO: ftruncate op for tokio-epoll-uring: https://github.com/neondatabase/neon/issues/11817
// TODO: ftruncate op for tokio-epoll-uring
// Don't forget to use retry_ecanceled_once
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
@@ -249,51 +248,6 @@ impl IoEngine {
}
}
pub(super) async fn fallocate(
&self,
file_guard: FileGuard,
offset: u64,
len: u64,
) -> (FileGuard, std::io::Result<()>) {
// NB: if you ever think of using FALLOC_FL_KEEP_SIZE, keep
// in mind that I have found it to be punting to io_uring worker threads
// on Debian Bookworm Linux 6.1.0-32-amd64 and 6.12.25 mainline.
// => https://gist.github.com/problame/ed876bea40b915ba53267b8265e99352
match self {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
let flags = nix::fcntl::FallocateFlags::empty();
let Ok(offset) = nix::libc::off_t::try_from(offset) else {
return (
file_guard,
Err(std::io::Error::from_raw_os_error(nix::libc::EINVAL)),
);
};
let Ok(len) = nix::libc::off_t::try_from(len) else {
return (
file_guard,
Err(std::io::Error::from_raw_os_error(nix::libc::EINVAL)),
);
};
let res = file_guard.with_std_file(|std_file| {
nix::fcntl::fallocate(std_file.as_raw_fd(), flags, offset, len)
});
let res = res.map_err(|e: nix::errno::Errno| e.into());
(file_guard, res)
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let flags = tokio_epoll_uring::FallocateFlags::empty();
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (file_guard, res) = retry_ecanceled_once(file_guard, async |file_guard| {
system.fallocate(file_guard, flags, offset, len).await
})
.await;
(file_guard, res.map_err(epoll_uring_error_to_std))
}
}
}
pub(super) async fn write_at<B: IoBuf + Send>(
&self,
file_guard: FileGuard,

View File

@@ -8,7 +8,13 @@ use super::io_engine::IoEngine;
#[derive(Debug, Clone)]
pub struct OpenOptions {
/// We keep a copy of the write() flag we pass to the `inner`` `OptionOptions`
/// to support [`Self::is_write`].
write: bool,
/// We don't expose + pass through a raw `custom_flags()` style API.
/// The only custom flag we support is `O_DIRECT`, which we track here
/// and map to `custom_flags()` in the [`Self::open`] method.
direct: bool,
inner: Inner,
}
#[derive(Debug, Clone)]
@@ -30,6 +36,7 @@ impl Default for OpenOptions {
};
Self {
write: false,
direct: false,
inner,
}
}
@@ -44,6 +51,10 @@ impl OpenOptions {
self.write
}
pub(super) fn is_direct(&self) -> bool {
self.direct
}
pub fn read(mut self, read: bool) -> Self {
match &mut self.inner {
Inner::StdFs(x) => {
@@ -116,13 +127,38 @@ impl OpenOptions {
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match &self.inner {
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
let mut custom_flags = 0;
if self.direct {
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
{
custom_flags |= nix::libc::O_DIRECT;
}
#[cfg(not(target_os = "linux"))]
{
// Other platforms may be used for development but don't necessarily have a 1:1 equivalent to Linux's O_DIRECT (macOS!).
// Just don't set the flag; to catch alignment bugs typical for O_DIRECT,
// we have a runtime validation layer inside `VirtualFile::write_at` and `VirtualFile::read_at`.
static WARNING: std::sync::Once = std::sync::Once::new();
WARNING.call_once(|| {
let span = tracing::info_span!(parent: None, "open_options");
let _enter = span.enter();
tracing::warn!("your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs; this warning is logged once per process");
});
}
}
match self.inner.clone() {
Inner::StdFs(mut x) => x
.custom_flags(custom_flags)
.open(path)
.map(|file| file.into()),
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(mut x) => {
x.custom_flags(custom_flags);
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async {
let res = system.open(path, x).await;
let res = system.open(path, &x).await;
((), res)
})
.await;
@@ -144,19 +180,8 @@ impl OpenOptions {
self
}
pub fn custom_flags(mut self, flags: i32) -> Self {
if flags & nix::libc::O_APPEND != 0 {
super::io_engine::panic_operation_must_be_idempotent();
}
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);
}
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
let _ = x.custom_flags(flags);
}
}
pub fn direct(mut self, direct: bool) -> Self {
self.direct = direct;
self
}
}

View File

@@ -425,15 +425,12 @@ compact_prefetch_buffers(void)
* point inside and outside PostgreSQL.
*
* This still does throw errors when it receives malformed responses from PS.
*
* When we're not called from CHECK_FOR_INTERRUPTS (indicated by
* IsHandlingInterrupts) we also report we've ended prefetch receive work,
* just in case state tracking was lost due to an error in the sync getPage
* response code.
*/
void
communicator_prefetch_pump_state(bool IsHandlingInterrupts)
communicator_prefetch_pump_state(void)
{
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive != MyPState->ring_flush)
{
NeonResponse *response;
@@ -482,9 +479,7 @@ communicator_prefetch_pump_state(bool IsHandlingInterrupts)
}
}
/* We never pump the prefetch state while handling other pages */
if (!IsHandlingInterrupts)
END_PREFETCH_RECEIVE_WORK();
END_PREFETCH_RECEIVE_WORK();
communicator_reconfigure_timeout_if_needed();
}
@@ -672,9 +667,10 @@ prefetch_wait_for(uint64 ring_index)
Assert(MyPState->ring_unused > ring_index);
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive <= ring_index)
{
START_PREFETCH_RECEIVE_WORK();
entry = GetPrfSlot(MyPState->ring_receive);
Assert(entry->status == PRFS_REQUESTED);
@@ -683,17 +679,18 @@ prefetch_wait_for(uint64 ring_index)
result = false;
break;
}
END_PREFETCH_RECEIVE_WORK();
CHECK_FOR_INTERRUPTS();
}
if (result)
{
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
PrefetchRequest *slot = GetPrfSlot(ring_index);
return slot->status == PRFS_RECEIVED;
result = slot->status == PRFS_RECEIVED;
}
return false;
END_PREFETCH_RECEIVE_WORK();
return result;
;
}
@@ -720,6 +717,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
Assert(readpage_reentrant_guard);
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
@@ -802,6 +800,7 @@ communicator_prefetch_receive(BufferTag tag)
PrfHashEntry *entry;
PrefetchRequest hashkey;
Assert(readpage_reentrant_guard);
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
@@ -821,8 +820,12 @@ communicator_prefetch_receive(BufferTag tag)
void
prefetch_on_ps_disconnect(void)
{
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
MyPState->ring_flush = MyPState->ring_unused;
/* Prohibit callig of prefetch_pump_state */
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive < MyPState->ring_unused)
{
PrefetchRequest *slot;
@@ -851,6 +854,9 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
/* Restore guard */
readpage_reentrant_guard = save_readpage_reentrant_guard;
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -2509,7 +2515,7 @@ communicator_processinterrupts(void)
if (timeout_signaled)
{
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
communicator_prefetch_pump_state(true);
communicator_prefetch_pump_state();
timeout_signaled = false;
communicator_reconfigure_timeout_if_needed();

View File

@@ -44,7 +44,7 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
void *buffer);
extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
extern void communicator_prefetch_pump_state(void);
#endif

View File

@@ -433,7 +433,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
now = GetCurrentTimestamp();
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
shard->last_reconnect_time = now;
/*
* Make sure we don't do exponential backoff with a constant multiplier
@@ -447,14 +446,23 @@ pageserver_connect(shardno_t shard_no, int elevel)
/*
* If we did other tasks between reconnect attempts, then we won't
* need to wait as long as a full delay.
*
* This is a loop to protect against interrupted sleeps.
*/
if (us_since_last_attempt < shard->delay_us)
while (us_since_last_attempt < shard->delay_us)
{
pg_usleep(shard->delay_us - us_since_last_attempt);
/* At least we should handle cancellations here */
CHECK_FOR_INTERRUPTS();
now = GetCurrentTimestamp();
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
}
/* update the delay metric */
shard->delay_us = Min(shard->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
shard->last_reconnect_time = now;
/*
* Connect using the connection string we got from the

View File

@@ -1179,7 +1179,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum += iterblocks;
}
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
return false;
}
@@ -1218,7 +1218,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
return false;
}
@@ -1262,7 +1262,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
*/
neon_log(SmgrTrace, "writeback noop");
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1281,75 +1281,24 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
communicator_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
}
#if PG_MAJORVERSION_NUM < 17
/*
* neon_read() -- Read the specified block from a relation.
*/
#if PG_MAJORVERSION_NUM < 16
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer)
#else
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
#endif
{
neon_request_lsns request_lsns;
bits8 present;
void *bufferp;
switch (reln->smgr_relpersistence)
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdread(reln, forkNum, blkno, buffer);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* Try to read PS results if they are available */
communicator_prefetch_pump_state(false);
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
present = 0;
bufferp = buffer;
if (communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, &bufferp, &present))
{
/* Prefetch hit */
return;
}
/* Try to read from local file cache */
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
{
MyNeonCounters->file_cache_hits_total++;
return;
}
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state(false);
#ifdef DEBUG_COMPARE_LOCAL
static void
compare_with_local(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void* buffer, XLogRecPtr request_lsn)
{
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
char pageserver_masked[BLCKSZ];
PGIOAlignedBlock mdbuf;
PGIOAlignedBlock mdbuf_masked;
XLogRecPtr request_lsn = request_lsns.request_lsn;
#if PG_MAJORVERSION_NUM >= 17
{
void* mdbuffers[1] = { mdbuf.data };
mdreadv(reln, forkNum, blkno, mdbuffers, 1);
}
#else
mdread(reln, forkNum, blkno, mdbuf.data);
#endif
memcpy(pageserver_masked, buffer, BLCKSZ);
memcpy(mdbuf_masked.data, mdbuf.data, BLCKSZ);
@@ -1413,11 +1362,105 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
}
}
}
}
#endif
#if PG_MAJORVERSION_NUM < 17
/*
* neon_read() -- Read the specified block from a relation.
*/
#if PG_MAJORVERSION_NUM < 16
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer)
#else
static void
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
#endif
{
neon_request_lsns request_lsns;
bits8 present;
void *bufferp;
switch (reln->smgr_relpersistence)
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
case RELPERSISTENCE_PERMANENT:
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdread(reln, forkNum, blkno, buffer);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* Try to read PS results if they are available */
communicator_prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
present = 0;
bufferp = buffer;
if (communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, &bufferp, &present))
{
/* Prefetch hit */
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#else
return;
#endif
}
/* Try to read from local file cache */
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
{
MyNeonCounters->file_cache_hits_total++;
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#else
return;
#endif
}
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
compare_with_local(reln, forkNum, blkno, buffer, request_lsns.request_lsn);
#endif
}
#endif /* PG_MAJORVERSION_NUM <= 16 */
#if PG_MAJORVERSION_NUM >= 17
#ifdef DEBUG_COMPARE_LOCAL
static void
compare_with_localv(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void** buffers, BlockNumber nblocks, neon_request_lsns* request_lsns, bits8* read_pages)
{
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
for (BlockNumber i = 0; i < nblocks; i++)
{
if (BITMAP_ISSET(read_pages, i))
{
compare_with_local(reln, forkNum, blkno + i, buffers[i], request_lsns[i].request_lsn);
}
}
}
}
#endif
static void
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
@@ -1449,7 +1492,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
nblocks, PG_IOV_MAX);
/* Try to read PS results if they are available */
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
@@ -1460,8 +1503,13 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum, request_lsns, nblocks,
buffers, read_pages);
#ifdef DEBUG_COMPARE_LOCAL
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
memset(read_pages, 0, sizeof(read_pages));
#else
if (prefetch_result == nblocks)
return;
#endif
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
@@ -1470,9 +1518,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
#ifdef DEBUG_COMPARE_LOCAL
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
memset(read_pages, 0, sizeof(read_pages));
#else
/* Read all blocks from LFC, so we're done */
if (prefetch_result + lfc_result == nblocks)
return;
#endif
communicator_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read_pages);
@@ -1480,94 +1533,11 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
{
char pageserver_masked[BLCKSZ];
PGIOAlignedBlock mdbuf;
PGIOAlignedBlock mdbuf_masked;
XLogRecPtr request_lsn = request_lsns->request_lsn;
for (int i = 0; i < nblocks; i++)
{
BlockNumber blkno = blocknum + i;
if (!BITMAP_ISSET(read_pages, i))
continue;
#if PG_MAJORVERSION_NUM >= 17
{
void* mdbuffers[1] = { mdbuf.data };
mdreadv(reln, forknum, blkno, mdbuffers, 1);
}
#else
mdread(reln, forknum, blkno, mdbuf.data);
#endif
memcpy(pageserver_masked, buffers[i], BLCKSZ);
memcpy(mdbuf_masked.data, mdbuf.data, BLCKSZ);
if (PageIsNew((Page) mdbuf.data))
{
if (!PageIsNew((Page) pageserver_masked))
{
neon_log(PANIC, "page is new in MD but not in Page Server at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(buffers[i]));
}
}
else if (PageIsNew((Page) buffers[i]))
{
neon_log(PANIC, "page is new in Page Server but not in MD at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf.data));
}
else if (PageGetSpecialSize(mdbuf.data) == 0)
{
/* assume heap */
RmgrTable[RM_HEAP_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_HEAP_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "heap buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
else if (PageGetSpecialSize(mdbuf.data) == MAXALIGN(sizeof(BTPageOpaqueData)))
{
if (((BTPageOpaqueData *) PageGetSpecialPointer(mdbuf.data))->btpo_cycleid < MAX_BT_CYCLE_ID)
{
/* assume btree */
RmgrTable[RM_BTREE_ID].rm_mask(mdbuf_masked.data, blkno);
RmgrTable[RM_BTREE_ID].rm_mask(pageserver_masked, blkno);
if (memcmp(mdbuf_masked.data, pageserver_masked, BLCKSZ) != 0)
{
neon_log(PANIC, "btree buffers differ at blk %u in rel %u/%u/%u fork %u (request LSN %X/%08X):\n------ MD ------\n%s\n------ Page Server ------\n%s\n",
blkno,
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
hexdump_page(mdbuf_masked.data),
hexdump_page(pageserver_masked));
}
}
}
}
}
memset(read_pages, 0xFF, sizeof(read_pages));
compare_with_localv(reln, forknum, blocknum, buffers, nblocks, request_lsns, read_pages);
#endif
}
#endif
@@ -1665,7 +1635,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1727,7 +1697,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))
@@ -1902,7 +1872,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
communicator_prefetch_pump_state(false);
communicator_prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln))

View File

@@ -1,6 +1,10 @@
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[unsafe(export_name = "malloc_conf")]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
proxy::binary::proxy::run().await

View File

@@ -423,8 +423,8 @@ async fn refresh_config_inner(
if let Some(tls_config) = data.tls {
let tls_config = tokio::task::spawn_blocking(move || {
crate::tls::server_config::configure_tls(
&tls_config.key_path,
&tls_config.cert_path,
tls_config.key_path.as_ref(),
tls_config.cert_path.as_ref(),
None,
false,
)

View File

@@ -1,8 +1,10 @@
/// A stand-alone program that routes connections, e.g. from
/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`.
///
/// This allows connecting to pods/services running in the same Kubernetes cluster from
/// the outside. Similar to an ingress controller for HTTPS.
//! A stand-alone program that routes connections, e.g. from
//! `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`.
//!
//! This allows connecting to pods/services running in the same Kubernetes cluster from
//! the outside. Similar to an ingress controller for HTTPS.
use std::path::Path;
use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, anyhow, bail, ensure};
@@ -86,46 +88,7 @@ pub async fn run() -> anyhow::Result<()> {
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
) {
(Some(key_path), Some(cert_path)) => {
let key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys =
rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec();
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
PrivateKeyDer::Pkcs8(
keys.pop()
.expect("keys should not be empty")
.context(format!("Failed to read TLS keys at '{key_path}'"))?,
)
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain: Vec<_> = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
})?
};
// needed for channel bindings
let first_cert = cert_chain.first().context("missing certificate")?;
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let tls_config =
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
.context("ring should support TLS1.2 and TLS1.3")?
.with_no_client_auth()
.with_single_cert(cert_chain, key)?
.into();
(tls_config, tls_server_end_point)
}
(Some(key_path), Some(cert_path)) => parse_tls(key_path.as_ref(), cert_path.as_ref())?,
_ => bail!("tls-key and tls-cert must be specified"),
};
@@ -188,7 +151,58 @@ pub async fn run() -> anyhow::Result<()> {
match signal {}
}
async fn task_main(
pub(super) fn parse_tls(
key_path: &Path,
cert_path: &Path,
) -> anyhow::Result<(Arc<rustls::ServerConfig>, TlsServerEndPoint)> {
let key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec();
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
PrivateKeyDer::Pkcs8(
keys.pop()
.expect("keys should not be empty")
.context(format!(
"Failed to read TLS keys at '{}'",
key_path.display()
))?,
)
};
let cert_chain_bytes = std::fs::read(cert_path).context(format!(
"Failed to read TLS cert file at '{}.'",
cert_path.display()
))?;
let cert_chain: Vec<_> = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!(
"Failed to read TLS certificate chain from bytes from file at '{}'.",
cert_path.display()
)
})?
};
// needed for channel bindings
let first_cert = cert_chain.first().context("missing certificate")?;
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let tls_config =
rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])
.context("ring should support TLS1.2 and TLS1.3")?
.with_no_client_auth()
.with_single_cert(cert_chain, key)?
.into();
Ok((tls_config, tls_server_end_point))
}
pub(super) async fn task_main(
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
compute_tls_config: Option<Arc<rustls::ClientConfig>>,

View File

@@ -1,9 +1,10 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use anyhow::{bail, ensure};
use arc_swap::ArcSwapOption;
use futures::future::Either;
use remote_storage::RemoteStorageConfig;
@@ -62,18 +63,18 @@ struct ProxyCliArgs {
region: String,
/// listen for incoming client connections on ip:port
#[clap(short, long, default_value = "127.0.0.1:4432")]
proxy: String,
proxy: SocketAddr,
#[clap(value_enum, long, default_value_t = AuthBackendType::ConsoleRedirect)]
auth_backend: AuthBackendType,
/// listen for management callback connection on ip:port
#[clap(short, long, default_value = "127.0.0.1:7000")]
mgmt: String,
mgmt: SocketAddr,
/// listen for incoming http connections (metrics, etc) on ip:port
#[clap(long, default_value = "127.0.0.1:7001")]
http: String,
http: SocketAddr,
/// listen for incoming wss connections on ip:port
#[clap(long)]
wss: Option<String>,
wss: Option<SocketAddr>,
/// redirect unauthenticated users to the given uri in case of console redirect auth
#[clap(short, long, default_value = "http://localhost:3000/psql_session/")]
uri: String,
@@ -99,18 +100,18 @@ struct ProxyCliArgs {
///
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
#[clap(short = 'k', long, alias = "ssl-key")]
tls_key: Option<String>,
tls_key: Option<PathBuf>,
/// path to TLS cert for client postgres connections
///
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
#[clap(short = 'c', long, alias = "ssl-cert")]
tls_cert: Option<String>,
tls_cert: Option<PathBuf>,
/// Allow writing TLS session keys to the given file pointed to by the environment variable `SSLKEYLOGFILE`.
#[clap(long, alias = "allow-ssl-keylogfile")]
allow_tls_keylogfile: bool,
/// path to directory with TLS certificates for client postgres connections
#[clap(long)]
certs_dir: Option<String>,
certs_dir: Option<PathBuf>,
/// timeout for the TLS handshake
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
handshake_timeout: tokio::time::Duration,
@@ -229,6 +230,9 @@ struct ProxyCliArgs {
// TODO: rename to `console_redirect_confirmation_timeout`.
#[clap(long, default_value = "2m", value_parser = humantime::parse_duration)]
webauth_confirmation_timeout: std::time::Duration,
#[clap(flatten)]
pg_sni_router: PgSniRouterArgs,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -277,6 +281,25 @@ struct SqlOverHttpArgs {
sql_over_http_max_response_size_bytes: usize,
}
#[derive(clap::Args, Clone, Debug)]
struct PgSniRouterArgs {
/// listen for incoming client connections on ip:port
#[clap(id = "sni-router-listen", long, default_value = "127.0.0.1:4432")]
listen: SocketAddr,
/// listen for incoming client connections on ip:port, requiring TLS to compute
#[clap(id = "sni-router-listen-tls", long, default_value = "127.0.0.1:4433")]
listen_tls: SocketAddr,
/// path to TLS key for client postgres connections
#[clap(id = "sni-router-tls-key", long)]
tls_key: Option<PathBuf>,
/// path to TLS cert for client postgres connections
#[clap(id = "sni-router-tls-cert", long)]
tls_cert: Option<PathBuf>,
/// append this domain zone to the SNI hostname to get the destination address
#[clap(id = "sni-router-destination", long)]
dest: Option<String>,
}
pub async fn run() -> anyhow::Result<()> {
let _logging_guard = crate::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
@@ -307,73 +330,51 @@ pub async fn run() -> anyhow::Result<()> {
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
}
info!("Using region: {}", args.aws_region);
// TODO: untangle the config args
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
("plain", redis_url) => match redis_url {
None => {
bail!("plain auth requires redis_notifications to be set");
}
Some(url) => {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
}
},
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(
ConnectionWithCredentialsProvider::new_with_credentials_provider(
host.to_string(),
port,
elasticache::CredentialsProvider::new(
args.aws_region,
args.redis_cluster_name,
args.redis_user_id,
)
.await,
),
),
(None, None) => {
warn!(
"irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"
);
None
}
_ => {
bail!("redis-host and redis-port must be specified together");
}
},
_ => {
bail!("unknown auth type given");
}
};
let redis_notifications_client = if let Some(url) = args.redis_notifications {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url))
} else {
regional_redis_client.clone()
};
let (regional_redis_client, redis_notifications_client) = configure_redis(&args).await?;
// Check that we can bind to address before further initialization
let http_address: SocketAddr = args.http.parse()?;
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
info!("Starting http on {}", args.http);
let http_listener = TcpListener::bind(args.http).await?.into_std()?;
let mgmt_address: SocketAddr = args.mgmt.parse()?;
info!("Starting mgmt on {mgmt_address}");
let mgmt_listener = TcpListener::bind(mgmt_address).await?;
info!("Starting mgmt on {}", args.mgmt);
let mgmt_listener = TcpListener::bind(args.mgmt).await?;
let proxy_listener = if args.is_auth_broker {
None
} else {
let proxy_address: SocketAddr = args.proxy.parse()?;
info!("Starting proxy on {proxy_address}");
info!("Starting proxy on {}", args.proxy);
Some(TcpListener::bind(args.proxy).await?)
};
Some(TcpListener::bind(proxy_address).await?)
let sni_router_listeners = {
let args = &args.pg_sni_router;
if args.dest.is_some() {
ensure!(
args.tls_key.is_some(),
"sni-router-tls-key must be provided"
);
ensure!(
args.tls_cert.is_some(),
"sni-router-tls-cert must be provided"
);
info!(
"Starting pg-sni-router on {} and {}",
args.listen, args.listen_tls
);
Some((
TcpListener::bind(args.listen).await?,
TcpListener::bind(args.listen_tls).await?,
))
} else {
None
}
};
// TODO: rename the argument to something like serverless.
// It now covers more than just websockets, it also covers SQL over HTTP.
let serverless_listener = if let Some(serverless_address) = args.wss {
let serverless_address: SocketAddr = serverless_address.parse()?;
info!("Starting wss on {serverless_address}");
Some(TcpListener::bind(serverless_address).await?)
} else if args.is_auth_broker {
@@ -458,6 +459,37 @@ pub async fn run() -> anyhow::Result<()> {
}
}
// spawn pg-sni-router mode.
if let Some((listen, listen_tls)) = sni_router_listeners {
let args = args.pg_sni_router;
let dest = args.dest.expect("already asserted it is set");
let key_path = args.tls_key.expect("already asserted it is set");
let cert_path = args.tls_cert.expect("already asserted it is set");
let (tls_config, tls_server_end_point) =
super::pg_sni_router::parse_tls(&key_path, &cert_path)?;
let dest = Arc::new(dest);
client_tasks.spawn(super::pg_sni_router::task_main(
dest.clone(),
tls_config.clone(),
None,
tls_server_end_point,
listen,
cancellation_token.clone(),
));
client_tasks.spawn(super::pg_sni_router::task_main(
dest,
tls_config,
Some(config.connect_to_compute.tls.clone()),
tls_server_end_point,
listen_tls,
cancellation_token.clone(),
));
}
client_tasks.spawn(crate::context::parquet::worker(
cancellation_token.clone(),
args.parquet_upload,
@@ -565,7 +597,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(
key_path,
cert_path,
args.certs_dir.as_ref(),
args.certs_dir.as_deref(),
args.allow_tls_keylogfile,
)?),
(None, None) => None,
@@ -811,6 +843,60 @@ fn build_auth_backend(
}
}
async fn configure_redis(
args: &ProxyCliArgs,
) -> anyhow::Result<(
Option<ConnectionWithCredentialsProvider>,
Option<ConnectionWithCredentialsProvider>,
)> {
// TODO: untangle the config args
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
("plain", redis_url) => match redis_url {
None => {
bail!("plain auth requires redis_notifications to be set");
}
Some(url) => {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
}
},
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(
ConnectionWithCredentialsProvider::new_with_credentials_provider(
host.to_string(),
port,
elasticache::CredentialsProvider::new(
args.aws_region.clone(),
args.redis_cluster_name.clone(),
args.redis_user_id.clone(),
)
.await,
),
),
(None, None) => {
// todo: upgrade to error?
warn!(
"irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"
);
None
}
_ => {
bail!("redis-host and redis-port must be specified together");
}
},
_ => {
bail!("unknown auth type given");
}
};
let redis_notifications_client = if let Some(url) = &args.redis_notifications {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
} else {
regional_redis_client.clone()
};
Ok((regional_redis_client, redis_notifications_client))
}
#[cfg(test)]
mod tests {
use std::time::Duration;

View File

@@ -6,12 +6,12 @@ use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::CancelToken;
use postgres_client::tls::MakeTlsConnect;
use pq_proto::CancelKeyData;
use redis::{FromRedisValue, Pipeline, Value, pipe};
use redis::{Cmd, FromRedisValue, Value};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::{AuthError, check_peer_addr_is_in_list};
@@ -56,8 +56,70 @@ pub enum CancelKeyOp {
},
}
pub struct Pipeline {
inner: redis::Pipeline,
replies: Vec<CancelReplyOp>,
}
impl Pipeline {
fn with_capacity(n: usize) -> Self {
Self {
inner: redis::Pipeline::with_capacity(n),
replies: Vec::with_capacity(n),
}
}
async fn execute(&mut self, client: &mut RedisKVClient) {
let responses = self.replies.len();
let batch_size = self.inner.len();
match client.query(&self.inner).await {
// for each reply, we expect that many values.
Ok(Value::Array(values)) if values.len() == responses => {
debug!(
batch_size,
responses, "successfully completed cancellation jobs",
);
for (value, reply) in std::iter::zip(values, self.replies.drain(..)) {
reply.send_value(value);
}
}
Ok(value) => {
error!(batch_size, ?value, "unexpected redis return value");
for reply in self.replies.drain(..) {
reply.send_err(anyhow!("incorrect response type from redis"));
}
}
Err(err) => {
for reply in self.replies.drain(..) {
reply.send_err(anyhow!("could not send cmd to redis: {err}"));
}
}
}
self.inner.clear();
self.replies.clear();
}
fn add_command_with_reply(&mut self, cmd: Cmd, reply: CancelReplyOp) {
self.inner.add_command(cmd);
self.replies.push(reply);
}
fn add_command_no_reply(&mut self, cmd: Cmd) {
self.inner.add_command(cmd).ignore();
}
fn add_command(&mut self, cmd: Cmd, reply: Option<CancelReplyOp>) {
match reply {
Some(reply) => self.add_command_with_reply(cmd, reply),
None => self.add_command_no_reply(cmd),
}
}
}
impl CancelKeyOp {
fn register(self, pipe: &mut Pipeline) -> Option<CancelReplyOp> {
fn register(self, pipe: &mut Pipeline) {
#[allow(clippy::used_underscore_binding)]
match self {
CancelKeyOp::StoreCancelKey {
@@ -68,18 +130,18 @@ impl CancelKeyOp {
_guard,
expire,
} => {
pipe.hset(&key, field, value);
pipe.expire(key, expire);
let resp_tx = resp_tx?;
Some(CancelReplyOp::StoreCancelKey { resp_tx, _guard })
let reply =
resp_tx.map(|resp_tx| CancelReplyOp::StoreCancelKey { resp_tx, _guard });
pipe.add_command(Cmd::hset(&key, field, value), reply);
pipe.add_command_no_reply(Cmd::expire(key, expire));
}
CancelKeyOp::GetCancelData {
key,
resp_tx,
_guard,
} => {
pipe.hgetall(key);
Some(CancelReplyOp::GetCancelData { resp_tx, _guard })
let reply = CancelReplyOp::GetCancelData { resp_tx, _guard };
pipe.add_command_with_reply(Cmd::hgetall(key), reply);
}
CancelKeyOp::RemoveCancelKey {
key,
@@ -87,9 +149,9 @@ impl CancelKeyOp {
resp_tx,
_guard,
} => {
pipe.hdel(key, field);
let resp_tx = resp_tx?;
Some(CancelReplyOp::RemoveCancelKey { resp_tx, _guard })
let reply =
resp_tx.map(|resp_tx| CancelReplyOp::RemoveCancelKey { resp_tx, _guard });
pipe.add_command(Cmd::hdel(key, field), reply);
}
}
}
@@ -170,8 +232,8 @@ pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
) -> anyhow::Result<()> {
let mut batch = Vec::new();
let mut replies = vec![];
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
loop {
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
@@ -182,42 +244,11 @@ pub async fn handle_cancel_messages(
let batch_size = batch.len();
debug!(batch_size, "running cancellation jobs");
let mut pipe = pipe();
for msg in batch.drain(..) {
if let Some(reply) = msg.register(&mut pipe) {
replies.push(reply);
} else {
pipe.ignore();
}
msg.register(&mut pipeline);
}
let responses = replies.len();
match client.query(pipe).await {
// for each reply, we expect that many values.
Ok(Value::Array(values)) if values.len() == responses => {
debug!(
batch_size,
responses, "successfully completed cancellation jobs",
);
for (value, reply) in std::iter::zip(values, replies.drain(..)) {
reply.send_value(value);
}
}
Ok(value) => {
debug!(?value, "unexpected redis return value");
for reply in replies.drain(..) {
reply.send_err(anyhow!("incorrect response type from redis"));
}
}
Err(err) => {
for reply in replies.drain(..) {
reply.send_err(anyhow!("could not send cmd to redis: {err}"));
}
}
}
replies.clear();
pipeline.execute(client).await;
}
}

View File

@@ -3,7 +3,7 @@ use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use anyhow::{anyhow, bail};
use http_utils::endpoint::{self, request_span};
use http_utils::endpoint::{self, profile_cpu_handler, profile_heap_handler, request_span};
use http_utils::error::ApiError;
use http_utils::json::json_response;
use http_utils::{RouterBuilder, RouterService};
@@ -33,6 +33,12 @@ fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
request_span(r, move |b| prometheus_metrics_handler(b, state))
})
.get("/v1/status", status_handler)
.get("/profile/cpu", move |r| {
request_span(r, profile_cpu_handler)
})
.get("/profile/heap", move |r| {
request_span(r, profile_heap_handler)
})
}
pub async fn task_main(

View File

@@ -47,7 +47,7 @@ impl RedisKVClient {
pub(crate) async fn query<T: FromRedisValue>(
&mut self,
q: impl Queryable,
q: &impl Queryable,
) -> anyhow::Result<T> {
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping query");

View File

@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, bail};
@@ -21,9 +22,9 @@ pub struct TlsConfig {
/// Configure TLS for the main endpoint.
pub fn configure_tls(
key_path: &str,
cert_path: &str,
certs_dir: Option<&String>,
key_path: &Path,
cert_path: &Path,
certs_dir: Option<&Path>,
allow_tls_keylogfile: bool,
) -> anyhow::Result<TlsConfig> {
// add default certificate
@@ -39,8 +40,7 @@ pub fn configure_tls(
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
cert_resolver.add_cert_path(&key_path, &cert_path)?;
}
}
}
@@ -86,7 +86,7 @@ pub struct CertResolver {
}
impl CertResolver {
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
fn parse_new(key_path: &Path, cert_path: &Path) -> anyhow::Result<Self> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
Self::new(priv_key, cert_chain)
}
@@ -103,7 +103,7 @@ impl CertResolver {
Ok(Self { certs, default })
}
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
fn add_cert_path(&mut self, key_path: &Path, cert_path: &Path) -> anyhow::Result<()> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
self.add_cert(priv_key, cert_chain)
}
@@ -124,26 +124,29 @@ impl CertResolver {
}
fn parse_key_cert(
key_path: &str,
cert_path: &str,
key_path: &Path,
cert_path: &Path,
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
.with_context(|| format!("Failed to read TLS keys at '{}'", key_path.display()))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{}'", key_path.display()))?
.with_context(|| format!("Failed to parse TLS keys at '{}'", key_path.display()))?
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain_bytes = std::fs::read(cert_path).context(format!(
"Failed to read TLS cert file at '{}.'",
cert_path.display()
))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
"Failed to read TLS certificate chain from bytes from file at '{}'.",
cert_path.display()
)
})?
};

View File

@@ -468,12 +468,15 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
match pull_timeline(
status,
safekeeper_host,
sk_auth_token,
http_client,
global_timelines,
check_tombstone,
)
.await
{
@@ -499,6 +502,7 @@ async fn pull_timeline(
sk_auth_token: Option<SecretString>,
http_client: reqwest::Client,
global_timelines: Arc<GlobalTimelines>,
check_tombstone: bool,
) -> Result<PullTimelineResponse> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
@@ -570,7 +574,7 @@ async fn pull_timeline(
// Finally, load the timeline.
let _tli = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, false)
.load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
.await?;
Ok(PullTimelineResponse {

View File

@@ -513,7 +513,7 @@ impl SafekeeperPostgresHandler {
let end_pos = end_watch.get();
if end_pos < start_pos {
warn!(
info!(
"requested start_pos {} is ahead of available WAL end_pos {}",
start_pos, end_pos
);

View File

@@ -157,6 +157,29 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
}
async fn handle_get_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.handle_timeline_shard_import_progress(tenant_shard_id, timeline_id)
.await?,
)
}
async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
@@ -2008,6 +2031,13 @@ pub fn make_router(
.post("/upcall/v1/validate", |r| {
named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
})
.get("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,
handle_get_timeline_import_status,
RequestName("upcall_v1_timeline_import_status"),
)
})
.post("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,

View File

@@ -1,3 +1,5 @@
use std::time::Duration;
use pageserver_api::models::detach_ancestor::AncestorDetached;
use pageserver_api::models::{
DetachBehavior, LocationConfig, LocationConfigListResponse, LsnLease, PageserverUtilization,
@@ -212,6 +214,7 @@ impl PageserverClient {
)
}
#[allow(unused)]
pub(crate) async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
@@ -357,4 +360,20 @@ impl PageserverClient {
self.inner.wait_lsn(tenant_shard_id, request).await
)
}
pub(crate) async fn activate_post_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
timeline_activate_timeout: Duration,
) -> Result<TimelineInfo> {
measured_request!(
"activate_post_import",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.activate_post_import(tenant_shard_id, timeline_id, timeline_activate_timeout)
.await
)
}
}

View File

@@ -1666,6 +1666,39 @@ impl Persistence {
}
}
pub(crate) async fn get_timeline_import(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<Option<TimelineImport>> {
use crate::schema::timeline_imports::dsl;
let persistent_import = self
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
Box::pin(async move {
let mut from_db: Vec<TimelineImportPersistence> = dsl::timeline_imports
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.filter(dsl::timeline_id.eq(timeline_id.to_string()))
.load(conn)
.await?;
if from_db.len() > 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
from_db.len()
)));
}
Ok(from_db.pop())
})
})
.await?;
persistent_import
.map(TimelineImport::from_persistent)
.transpose()
.map_err(|err| DatabaseError::Logical(format!("failed to deserialize import: {err}")))
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,

View File

@@ -35,12 +35,12 @@ use pageserver_api::controller_api::{
};
use pageserver_api::models::{
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease,
PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig,
PageserverUtilization, SecondaryProgress, ShardImportStatus, ShardParameters, TenantConfig,
TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest,
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
};
use pageserver_api::shard::{
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
@@ -61,6 +61,7 @@ use utils::completion::Barrier;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
use utils::sync::gate::{Gate, GateGuard};
use utils::{failpoint_support, pausable_failpoint};
@@ -98,7 +99,8 @@ use crate::tenant_shard::{
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{
ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient,
ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError,
TimelineImportState, UpcallClient,
};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -3886,10 +3888,10 @@ impl Service {
None
} else if safekeepers {
// Note that we do not support creating the timeline on the safekeepers
// for imported timelines. The `start_lsn` of the timeline is not known
// until the import finshes.
// https://github.com/neondatabase/neon/issues/11569
// Note that for imported timelines, we do not create the timeline on the safekeepers
// straight away. Instead, we do it once the import finalized such that we know what
// start LSN to provide for the safekeepers. This is done in
// [`Self::finalize_timeline_import`].
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
@@ -3905,6 +3907,38 @@ impl Service {
})
}
pub(crate) async fn handle_timeline_shard_import_progress(
self: &Arc<Self>,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<ShardImportStatus, ApiError> {
let maybe_import = self
.persistence
.get_timeline_import(tenant_shard_id.tenant_id, timeline_id)
.await?;
let import = maybe_import.ok_or_else(|| {
ApiError::NotFound(
format!(
"import for {}/{} not found",
tenant_shard_id.tenant_id, timeline_id
)
.into(),
)
})?;
import
.shard_statuses
.0
.get(&tenant_shard_id.to_index())
.cloned()
.ok_or_else(|| {
ApiError::NotFound(
format!("shard {} not found", tenant_shard_id.shard_slug()).into(),
)
})
}
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,
@@ -3943,6 +3977,16 @@ impl Service {
Ok(())
}
/// Finalize the import of a timeline
///
/// This method should be called once all shards have reported that the import is complete.
/// Firstly, it polls the post import timeline activation endpoint exposed by the pageserver.
/// Once the timeline is active on all shards, the timeline also gets created on the
/// safekeepers. Finally, notify cplane of the import completion (whether failed or
/// successful), and remove the import from the database and in-memory.
///
/// If this method gets pre-empted by shut down, it will be called again at start-up (on-going
/// imports are stored in the database).
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
shard_id=%import.timeline_id,
@@ -3950,48 +3994,80 @@ impl Service {
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
) -> Result<(), TimelineImportFinalizeError> {
tracing::info!("Finalizing timeline import");
pausable_failpoint!("timeline-import-pre-cplane-notification");
let import_failed = import.completion_error().is_some();
let tenant_id = import.tenant_id;
let timeline_id = import.timeline_id;
if !import_failed {
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
}
let active = self.timeline_active_on_all_shards(&import).await?;
match active {
true => {
tracing::info!("Timeline became active on all shards");
break;
}
false => {
tracing::info!("Timeline not active on all shards yet");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
},
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
};
}
}
let import_error = import.completion_error();
match import_error {
Some(err) => {
self.notify_cplane_and_delete_import(tenant_id, timeline_id, Err(err))
.await?;
tracing::warn!("Timeline import completed with shard errors");
Ok(())
}
}
None => match self.activate_timeline_post_import(&import).await {
Ok(timeline_info) => {
tracing::info!("Post import timeline activation complete");
if self.config.timelines_onto_safekeepers {
// Now that we know the start LSN of this timeline, create it on the
// safekeepers.
self.tenant_timeline_create_safekeepers_until_success(
import.tenant_id,
timeline_info,
)
.await?;
}
self.notify_cplane_and_delete_import(tenant_id, timeline_id, Ok(()))
.await?;
tracing::info!("Timeline import completed successfully");
Ok(())
}
Err(TimelineImportFinalizeError::ShuttingDown) => {
// We got pre-empted by shut down and will resume after the restart.
Err(TimelineImportFinalizeError::ShuttingDown)
}
Err(err) => {
// Any finalize error apart from shut down is permanent and requires us to notify
// cplane such that it can clean up.
tracing::error!("Import finalize failed with permanent error: {err}");
self.notify_cplane_and_delete_import(
tenant_id,
timeline_id,
Err(err.to_string()),
)
.await?;
Err(err)
}
},
}
}
async fn notify_cplane_and_delete_import(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
import_result: ImportResult,
) -> Result<(), TimelineImportFinalizeError> {
let import_failed = import_result.is_err();
tracing::info!(%import_failed, "Notifying cplane of import completion");
let client = UpcallClient::new(self.get_config(), self.cancel.child_token());
client.notify_import_complete(&import).await?;
client
.notify_import_complete(tenant_id, timeline_id, import_result)
.await
.map_err(|_err| TimelineImportFinalizeError::ShuttingDown)?;
if let Err(err) = self
.persistence
.delete_timeline_import(import.tenant_id, import.timeline_id)
.delete_timeline_import(tenant_id, timeline_id)
.await
{
tracing::warn!("Failed to delete timeline import entry from database: {err}");
@@ -4001,17 +4077,113 @@ impl Service {
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.range_mut(TenantShardId::tenant_range(tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
tracing::info!(%import_failed, "Timeline import complete");
Ok(())
}
/// Activate an imported timeline on all shards once the import is complete.
/// Returns the [`TimelineInfo`] reported by shard zero.
async fn activate_timeline_post_import(
self: &Arc<Self>,
import: &TimelineImport,
) -> Result<TimelineInfo, TimelineImportFinalizeError> {
const TIMELINE_ACTIVATE_TIMEOUT: Duration = Duration::from_millis(128);
let mut shards_to_activate: HashSet<ShardIndex> =
import.shard_statuses.0.keys().cloned().collect();
let mut shard_zero_timeline_info = None;
while !shards_to_activate.is_empty() {
if self.cancel.is_cancelled() {
return Err(TimelineImportFinalizeError::ShuttingDown);
}
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
return Err(TimelineImportFinalizeError::MismatchedShards(
tenant_shard_id.to_index(),
));
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
}
}
targets
};
let targeted_tenant_shards: Vec<_> = targets.iter().map(|(tid, _node)| *tid).collect();
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.activate_post_import(
tenant_shard_id,
import.timeline_id,
TIMELINE_ACTIVATE_TIMEOUT,
)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
let mut failed = 0;
for (tid, result) in targeted_tenant_shards.iter().zip(results.into_iter()) {
match result {
Ok(ok) => {
if tid.is_shard_zero() {
shard_zero_timeline_info = Some(ok);
}
shards_to_activate.remove(&tid.to_index());
}
Err(_err) => {
failed += 1;
}
}
}
if failed > 0 {
tracing::info!(
"Failed to activate timeline on {failed} shards post import. Will retry"
);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(250)) => {},
_ = self.cancel.cancelled() => {
return Err(TimelineImportFinalizeError::ShuttingDown);
}
}
}
Ok(shard_zero_timeline_info.expect("All shards replied"))
}
async fn finalize_timeline_imports(self: &Arc<Self>, imports: Vec<TimelineImport>) {
futures::future::join_all(
imports
@@ -4021,61 +4193,6 @@ impl Service {
.await;
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
) -> anyhow::Result<bool> {
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
anyhow::bail!("Shard layout change detected on completion");
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
} else {
return Ok(false);
}
}
targets
};
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.timeline_detail(tenant_shard_id, import.timeline_id)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
Ok(results.into_iter().all(|res| match res {
Ok(info) => info.state == TimelineState::Active,
Err(_) => false,
}))
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,

View File

@@ -1,4 +1,9 @@
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, atomic::AtomicU64},
time::Duration,
};
use clashmap::{ClashMap, Entry};
use safekeeper_api::models::PullTimelineRequest;
@@ -169,10 +174,17 @@ pub(crate) struct ScheduleRequest {
pub(crate) kind: SafekeeperTimelineOpKind,
}
/// A way to keep ongoing/queued reconcile requests apart
#[derive(Copy, Clone, PartialEq, Eq)]
struct TokenId(u64);
type OngoingTokens = ClashMap<(TenantId, Option<TimelineId>), (CancellationToken, TokenId)>;
/// Handle to per safekeeper reconciler.
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>,
ongoing_tokens: Arc<OngoingTokens>,
token_id_counter: AtomicU64,
cancel: CancellationToken,
}
@@ -185,24 +197,28 @@ impl ReconcilerHandle {
&self,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
) -> CancellationToken {
) -> (CancellationToken, TokenId) {
let token_id = self
.token_id_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let token_id = TokenId(token_id);
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
if let Entry::Occupied(entry) = &entry {
let cancel: &CancellationToken = entry.get();
let (cancel, _) = entry.get();
cancel.cancel();
}
entry.insert(self.cancel.child_token()).clone()
entry.insert((self.cancel.child_token(), token_id)).clone()
}
/// Cancel an ongoing reconciliation
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
cancel.cancel();
}
}
fn schedule_reconcile(&self, req: ScheduleRequest) {
let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
let hostname = req.safekeeper.skp.host.clone();
if let Err(err) = self.tx.send((req, cancel)) {
if let Err(err) = self.tx.send((req, cancel, token_id)) {
tracing::info!("scheduling request onto {hostname} returned error: {err}");
}
}
@@ -211,13 +227,14 @@ impl ReconcilerHandle {
pub(crate) struct SafekeeperReconciler {
inner: SafekeeperReconcilerInner,
concurrency_limiter: Arc<Semaphore>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>,
cancel: CancellationToken,
}
/// Thin wrapper over `Service` to not clutter its inherent functions
#[derive(Clone)]
struct SafekeeperReconcilerInner {
ongoing_tokens: Arc<OngoingTokens>,
service: Arc<Service>,
}
@@ -226,15 +243,20 @@ impl SafekeeperReconciler {
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
let (tx, rx) = mpsc::unbounded_channel();
let concurrency = service.config.safekeeper_reconciler_concurrency;
let ongoing_tokens = Arc::new(ClashMap::new());
let mut reconciler = SafekeeperReconciler {
inner: SafekeeperReconcilerInner { service },
inner: SafekeeperReconcilerInner {
service,
ongoing_tokens: ongoing_tokens.clone(),
},
rx,
concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
cancel: cancel.clone(),
};
let handle = ReconcilerHandle {
tx,
ongoing_tokens: Arc::new(ClashMap::new()),
ongoing_tokens,
token_id_counter: AtomicU64::new(0),
cancel,
};
tokio::spawn(async move { reconciler.run().await });
@@ -246,7 +268,9 @@ impl SafekeeperReconciler {
req = self.rx.recv() => req,
_ = self.cancel.cancelled() => break,
};
let Some((req, req_cancel)) = req else { break };
let Some((req, req_cancel, req_token_id)) = req else {
break;
};
let permit_res = tokio::select! {
req = self.concurrency_limiter.clone().acquire_owned() => req,
@@ -265,7 +289,7 @@ impl SafekeeperReconciler {
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
inner
.reconcile_one(req, req_cancel)
.reconcile_one(req, req_cancel, req_token_id)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
@@ -280,8 +304,14 @@ impl SafekeeperReconciler {
}
impl SafekeeperReconcilerInner {
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
async fn reconcile_one(
&self,
req: ScheduleRequest,
req_cancel: CancellationToken,
req_token_id: TokenId,
) {
let req_host = req.safekeeper.skp.host.clone();
let success;
match req.kind {
SafekeeperTimelineOpKind::Pull => {
let Some(timeline_id) = req.timeline_id else {
@@ -301,20 +331,24 @@ impl SafekeeperReconcilerInner {
http_hosts,
tenant_id: req.tenant_id,
timeline_id,
ignore_tombstone: Some(false),
};
self.reconcile_inner(
req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
if let Some(host) = resp.safekeeper_host {
tracing::info!("pulled timeline from {host} onto {req_host}");
} else {
tracing::info!("timeline already present on safekeeper on {req_host}");
}
},
req_cancel,
)
.await;
success = self
.reconcile_inner(
&req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
if let Some(host) = resp.safekeeper_host {
tracing::info!("pulled timeline from {host} onto {req_host}");
} else {
tracing::info!(
"timeline already present on safekeeper on {req_host}"
);
}
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Exclude => {
// TODO actually exclude instead of delete here
@@ -325,22 +359,23 @@ impl SafekeeperReconcilerInner {
);
return;
};
self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
success = self
.reconcile_inner(
&req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
if let Some(timeline_id) = req.timeline_id {
let deleted = self
success = self
.reconcile_inner(
req,
&req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
@@ -348,13 +383,13 @@ impl SafekeeperReconcilerInner {
req_cancel,
)
.await;
if deleted {
if success {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
} else {
let deleted = self
success = self
.reconcile_inner(
req,
&req,
async |client| client.delete_tenant(tenant_id).await,
|_resp| {
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
@@ -362,12 +397,21 @@ impl SafekeeperReconcilerInner {
req_cancel,
)
.await;
if deleted {
if success {
self.delete_tenant_timelines_from_db(tenant_id).await;
}
}
}
}
if success {
self.ongoing_tokens.remove_if(
&(req.tenant_id, req.timeline_id),
|_ttid, (_cancel, token_id)| {
// Ensure that this request is indeed the request we just finished and not a new one
req_token_id == *token_id
},
);
}
}
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
match self
@@ -421,10 +465,10 @@ impl SafekeeperReconcilerInner {
self.delete_timeline_from_db(tenant_id, timeline_id).await;
}
}
/// Returns whether the reconciliation happened successfully
/// Returns whether the reconciliation happened successfully (or we got cancelled)
async fn reconcile_inner<T, F, U>(
&self,
req: ScheduleRequest,
req: &ScheduleRequest,
closure: impl Fn(SafekeeperClient) -> F,
log_success: impl FnOnce(T) -> U,
req_cancel: CancellationToken,

View File

@@ -10,6 +10,7 @@ use crate::persistence::{
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
};
use crate::safekeeper::Safekeeper;
use crate::timeline_import::TimelineImportFinalizeError;
use anyhow::Context;
use http_utils::error::ApiError;
use pageserver_api::controller_api::{
@@ -323,6 +324,42 @@ impl Service {
})
}
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: TimelineInfo,
) -> Result<(), TimelineImportFinalizeError> {
const BACKOFF: Duration = Duration::from_secs(5);
loop {
if self.cancel.is_cancelled() {
return Err(TimelineImportFinalizeError::ShuttingDown);
}
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.await;
match res {
Ok(_) => {
tracing::info!("Timeline created on safekeepers");
break;
}
Err(err) => {
tracing::error!("Failed to create timeline on safekeepers: {err}");
tokio::select! {
_ = self.cancel.cancelled() => {
return Err(TimelineImportFinalizeError::ShuttingDown);
},
_ = tokio::time::sleep(BACKOFF) => {}
};
}
}
}
Ok(())
}
/// Directly insert the timeline into the database without reconciling it with safekeepers.
///
/// Useful if the timeline already exists on the specified safekeepers,

View File

@@ -46,6 +46,14 @@ pub(crate) enum TimelineImportUpdateFollowUp {
None,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum TimelineImportFinalizeError {
#[error("Shut down interrupted import finalize")]
ShuttingDown,
#[error("Mismatched shard detected during import finalize: {0}")]
MismatchedShards(ShardIndex),
}
pub(crate) enum TimelineImportUpdateError {
ImportNotFound {
tenant_id: TenantId,
@@ -151,6 +159,8 @@ impl TimelineImport {
}
}
pub(crate) type ImportResult = Result<(), String>;
pub(crate) struct UpcallClient {
authorization_header: Option<String>,
client: reqwest::Client,
@@ -198,7 +208,9 @@ impl UpcallClient {
/// eventual cplane availability. The cplane API is idempotent.
pub(crate) async fn notify_import_complete(
&self,
import: &TimelineImport,
tenant_id: TenantId,
timeline_id: TimelineId,
import_result: ImportResult,
) -> anyhow::Result<()> {
let endpoint = if self.base_url.ends_with('/') {
format!("{}import_complete", self.base_url)
@@ -206,15 +218,13 @@ impl UpcallClient {
format!("{}/import_complete", self.base_url)
};
tracing::info!("Endpoint is {endpoint}");
let request = self
.client
.request(Method::PUT, endpoint)
.json(&ImportCompleteRequest {
tenant_id: import.tenant_id,
timeline_id: import.timeline_id,
error: import.completion_error(),
tenant_id,
timeline_id,
error: import_result.err(),
})
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);

View File

@@ -355,6 +355,7 @@ pub(crate) async fn list_timeline_blobs(
match res {
ListTimelineBlobsResult::Ready(data) => Ok(data),
ListTimelineBlobsResult::MissingIndexPart(_) => {
tracing::warn!("listing raced with removal of an index, retrying");
// Retry if listing raced with removal of an index
let data = list_timeline_blobs_impl(remote_client, id, root_target)
.await?
@@ -441,7 +442,7 @@ async fn list_timeline_blobs_impl(
}
if index_part_keys.is_empty() && s3_layers.is_empty() {
tracing::debug!("Timeline is empty: expected post-deletion state.");
tracing::info!("Timeline is empty: expected post-deletion state.");
if initdb_archive {
tracing::info!("Timeline is post deletion but initdb archive is still present.");
}

View File

@@ -593,6 +593,7 @@ async fn gc_timeline(
index_part_snapshot_time: _,
} => (index_part, *index_part_generation, data.unused_index_keys),
BlobDataParseResult::Relic => {
tracing::info!("Skipping timeline {ttid}, it is a relic");
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
}

View File

@@ -1274,6 +1274,8 @@ class NeonEnv:
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config["compaction_algorithm"] = (
@@ -3605,6 +3607,8 @@ class NeonProxy(PgProtocol):
http_port: int,
mgmt_port: int,
external_http_port: int,
router_port: int,
router_tls_port: int,
auth_backend: NeonProxy.AuthBackend,
metric_collection_endpoint: str | None = None,
metric_collection_interval: str | None = None,
@@ -3621,6 +3625,8 @@ class NeonProxy(PgProtocol):
self.test_output_dir = test_output_dir
self.proxy_port = proxy_port
self.mgmt_port = mgmt_port
self.router_port = router_port
self.router_tls_port = router_tls_port
self.auth_backend = auth_backend
self.metric_collection_endpoint = metric_collection_endpoint
self.metric_collection_interval = metric_collection_interval
@@ -3635,6 +3641,14 @@ class NeonProxy(PgProtocol):
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("*.local.neon.build", key_path, crt_path)
# generate key for pg-sni-router.
# endpoint.namespace.local.neon.build resolves to 127.0.0.1
generate_proxy_tls_certs(
"endpoint.namespace.local.neon.build",
self.test_output_dir / "router.key",
self.test_output_dir / "router.crt",
)
args = [
str(self.neon_binpath / "proxy"),
*["--http", f"{self.host}:{self.http_port}"],
@@ -3644,6 +3658,11 @@ class NeonProxy(PgProtocol):
*["--sql-over-http-timeout", f"{self.http_timeout_seconds}s"],
*["-c", str(crt_path)],
*["-k", str(key_path)],
*["--sni-router-listen", f"{self.host}:{self.router_port}"],
*["--sni-router-listen-tls", f"{self.host}:{self.router_tls_port}"],
*["--sni-router-tls-cert", str(self.test_output_dir / "router.crt")],
*["--sni-router-tls-key", str(self.test_output_dir / "router.key")],
*["--sni-router-destination", "local.neon.build"],
*self.auth_backend.extra_args(),
]
@@ -3943,6 +3962,8 @@ def link_proxy(
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
@@ -3950,6 +3971,8 @@ def link_proxy(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
router_port=router_port,
router_tls_port=router_tls_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Link(),
) as proxy:
@@ -3983,6 +4006,8 @@ def static_proxy(
mgmt_port = port_distributor.get_port()
http_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
@@ -3990,6 +4015,8 @@ def static_proxy(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
router_port=router_port,
router_tls_port=router_tls_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Postgres(auth_endpoint),
) as proxy:
@@ -4613,7 +4640,10 @@ class EndpointFactory:
return self
def new_replica(
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
self,
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
branch_name = origin.branch_name
assert origin in self.endpoints
@@ -4629,7 +4659,10 @@ class EndpointFactory:
)
def new_replica_start(
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
self,
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
branch_name = origin.branch_name
assert origin in self.endpoints

View File

@@ -111,6 +111,13 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*stalling layer flushes for compaction backpressure.*",
".*layer roll waiting for flush due to compaction backpressure.*",
".*BatchSpanProcessor.*",
*(
[
r".*your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs.*"
]
if sys.platform != "linux"
else []
),
)

View File

@@ -24,6 +24,7 @@ from fixtures.utils import (
skip_in_debug_build,
wait_until,
)
from fixtures.workload import Workload
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -97,6 +98,10 @@ def test_pgdata_import_smoke(
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
if neon_env_builder.storage_controller_config is None:
neon_env_builder.storage_controller_config = {}
neon_env_builder.storage_controller_config["timelines_onto_safekeepers"] = True
env = neon_env_builder.init_start()
# The test needs LocalFs support, which is only built in testing mode.
@@ -125,9 +130,8 @@ def test_pgdata_import_smoke(
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2
elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS:
# Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data
# to exercise multiple segments.
target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192)
segment_size = 16 * 1024 * 1024
target_relblock_size = segment_size * 8
else:
raise ValueError
@@ -286,34 +290,28 @@ def test_pgdata_import_smoke(
#
# validate that we can write
#
rw_endpoint = env.endpoints.create_start(
branch_name=import_branch_name,
endpoint_id="rw",
tenant_id=tenant_id,
config_lines=ep_config,
)
rw_endpoint.safe_psql("create table othertable(values text)")
rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
workload.init()
workload.write_rows(64)
workload.validate()
# TODO: consider using `class Workload` here
# to do compaction and whatnot?
rw_lsn = Lsn(workload.endpoint().safe_psql_scalar("select pg_current_wal_flush_lsn()"))
#
# validate that we can branch (important use case)
#
# ... at the tip
_ = env.create_branch(
child_timeline_id = env.create_branch(
new_branch_name="br-tip",
ancestor_branch_name=import_branch_name,
tenant_id=tenant_id,
ancestor_start_lsn=rw_lsn,
)
br_tip_endpoint = env.endpoints.create_start(
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id, config_lines=ep_config
)
validate_vanilla_equivalence(br_tip_endpoint)
br_tip_endpoint.safe_psql("select * from othertable")
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
child_workload.validate()
validate_vanilla_equivalence(child_workload.endpoint())
# ... at the initdb lsn
_ = env.create_branch(
@@ -330,7 +328,7 @@ def test_pgdata_import_smoke(
)
validate_vanilla_equivalence(br_initdb_endpoint)
with pytest.raises(psycopg2.errors.UndefinedTable):
br_initdb_endpoint.safe_psql("select * from othertable")
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
@run_only_on_default_postgres(reason="PG version is irrelevant here")
@@ -414,6 +412,88 @@ def test_import_completion_on_restart(
wait_until(cplane_notified)
@run_only_on_default_postgres(reason="PG version is irrelevant here")
def test_import_respects_tenant_shutdown(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Validate that importing timelines respect the usual timeline life cycle:
1. Shut down on tenant shut-down and resumes upon re-attach
2. Deletion on timeline deletion (TODO)
"""
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
# Pause before sending the notification
failpoint_name = "import-timeline-pre-execute-pausable"
env.pageserver.http_client().configure_failpoints((failpoint_name, "pause"))
env.storage_controller.tenant_create(tenant_id)
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def hit_failpoint():
log.info("Checking log for pattern...")
try:
assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
wait_until(hit_failpoint)
assert not import_completion_signaled.is_set()
# Restart the pageserver while an import job is in progress.
# This clears the failpoint and we expect that the import starts up afresh
# after the restart and eventually completes.
env.pageserver.stop()
env.pageserver.start()
def cplane_notified():
assert import_completion_signaled.is_set()
wait_until(cplane_notified)
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,
@@ -521,7 +601,9 @@ def test_fast_import_with_pageserver_ingest(
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
# Run fast_import
fast_import.set_aws_creds(mock_s3_server, {"RUST_LOG": "aws_config=debug,aws_sdk_kms=debug"})
fast_import.set_aws_creds(
mock_s3_server, {"RUST_LOG": "info,aws_config=debug,aws_sdk_kms=debug"}
)
pg_port = port_distributor.get_port()
fast_import.run_pgdata(pg_port=pg_port, s3prefix=f"s3://{bucket}/{key_prefix}")

View File

@@ -52,6 +52,8 @@ def proxy_with_metric_collector(
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
@@ -63,6 +65,8 @@ def proxy_with_metric_collector(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
router_port=router_port,
router_tls_port=router_tls_port,
external_http_port=external_http_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,

View File

@@ -27,8 +27,9 @@ from contextlib import closing
import psycopg2
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
@@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
with secondary.cursor() as secondary_cur:
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (n_restarts,)
def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
sql = """
CREATE TABLE CHAR_TBL(f1 char(4));
CREATE TABLE FLOAT8_TBL(f1 float8);
CREATE TABLE INT2_TBL(f1 int2);
CREATE TABLE INT4_TBL(f1 int4);
CREATE TABLE INT8_TBL(q1 int8, q2 int8);
CREATE TABLE POINT_TBL(f1 point);
CREATE TABLE TEXT_TBL (f1 text);
CREATE TABLE VARCHAR_TBL(f1 varchar(4));
CREATE TABLE onek (unique1 int4);
CREATE TABLE onek2 AS SELECT * FROM onek;
CREATE TABLE tenk1 (unique1 int4);
CREATE TABLE tenk2 AS SELECT * FROM tenk1;
CREATE TABLE person (name text, age int4,location point);
CREATE TABLE emp (salary int4, manager name) INHERITS (person);
CREATE TABLE student (gpa float8) INHERITS (person);
CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student);
CREATE TABLE road (name text,thepath path);
CREATE TABLE ihighway () INHERITS (road);
CREATE TABLE shighway(surface text) INHERITS (road);
CREATE TABLE BOOLTBL3 (d text, b bool, o int);
CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool);
DROP TABLE BOOLTBL3;
DROP TABLE BOOLTBL4;
CREATE TABLE ceil_floor_round (a numeric);
DROP TABLE ceil_floor_round;
CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8);
DROP TABLE width_bucket_test;
CREATE TABLE num_input_test (n1 numeric);
CREATE TABLE num_variance (a numeric);
INSERT INTO num_variance VALUES (0);
CREATE TABLE snapshot_test (nr integer, snap txid_snapshot);
CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field);
TRUNCATE guid1;
DROP TABLE guid1;
DROP TABLE guid2 CASCADE;
CREATE TABLE numrange_test (nr NUMRANGE);
CREATE INDEX numrange_test_btree on numrange_test(nr);
CREATE TABLE numrange_test2(nr numrange);
CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr);
INSERT INTO numrange_test2 VALUES('[, 5)');
CREATE TABLE textrange_test (tr text);
CREATE INDEX textrange_test_btree on textrange_test(tr);
CREATE TABLE test_range_gist(ir int4range);
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
DROP INDEX test_range_gist_idx;
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
CREATE TABLE test_range_spgist(ir int4range);
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
DROP INDEX test_range_spgist_idx;
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
CREATE TABLE test_range_elem(i int4);
CREATE INDEX test_range_elem_idx on test_range_elem (i);
CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10));
DROP TABLE test_range_elem;
CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&));
CREATE TABLE f_test(f text, i int);
CREATE TABLE i8r_array (f1 int, f2 text);
CREATE TYPE arrayrange as range (subtype=int4[]);
CREATE TYPE two_ints as (a int, b int);
DROP TYPE two_ints cascade;
CREATE TABLE text_support_test (t text);
CREATE TABLE TEMP_FLOAT (f1 FLOAT8);
CREATE TABLE TEMP_INT4 (f1 INT4);
CREATE TABLE TEMP_INT2 (f1 INT2);
CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8);
CREATE TABLE POLYGON_TBL(f1 polygon);
CREATE TABLE quad_poly_tbl (id int, p polygon);
INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y;
CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl;
CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl;
"""
with endpoint.cursor() as cur:
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
for line in sql.split("\n"):
if len(line.strip()) == 0 or line.startswith("--"):
continue
cur.execute(line)
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
cur.execute("VACUUM FULL pg_class;")
for ep in env.endpoints.endpoints:
log.info(f"{ep.endpoint_id} / {ep.pg_port}")
pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"]
env_vars = {
"PGPORT": str(ep.pg_port),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
}
pg_bin.run_capture(pg_dump_command, env=env_vars)

View File

@@ -39,3 +39,10 @@ def test_role_grants(neon_simple_env: NeonEnv):
res = cur.fetchall()
assert res == [(1,)], "select should not succeed"
# confirm that replicas can also ensure the grants are correctly set.
replica = env.endpoints.new_replica_start(endpoint)
replica_client = replica.http_client()
replica_client.set_role_grants(
"test_role_grants", "test_role", "test_schema", ["CREATE", "USAGE"]
)

View File

@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING
import backoff
from fixtures.log_helper import log
from fixtures.neon_fixtures import PgProtocol, VanillaPostgres
from fixtures.neon_fixtures import NeonProxy, PgProtocol, VanillaPostgres
if TYPE_CHECKING:
from pathlib import Path
@@ -41,6 +41,7 @@ class PgSniRouter(PgProtocol):
self,
neon_binpath: Path,
port: int,
tls_port: int,
destination: str,
tls_cert: Path,
tls_key: Path,
@@ -53,6 +54,7 @@ class PgSniRouter(PgProtocol):
self.host = host
self.neon_binpath = neon_binpath
self.port = port
self.tls_port = tls_port
self.destination = destination
self.tls_cert = tls_cert
self.tls_key = tls_key
@@ -64,6 +66,7 @@ class PgSniRouter(PgProtocol):
args = [
str(self.neon_binpath / "pg_sni_router"),
*["--listen", f"127.0.0.1:{self.port}"],
*["--listen-tls", f"127.0.0.1:{self.tls_port}"],
*["--tls-cert", str(self.tls_cert)],
*["--tls-key", str(self.tls_key)],
*["--destination", self.destination],
@@ -127,10 +130,12 @@ def test_pg_sni_router(
pg_port = vanilla_pg.default_options["port"]
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
with PgSniRouter(
neon_binpath=neon_binpath,
port=router_port,
tls_port=router_tls_port,
destination="local.neon.build",
tls_cert=test_output_dir / "router.crt",
tls_key=test_output_dir / "router.key",
@@ -146,3 +151,22 @@ def test_pg_sni_router(
hostaddr="127.0.0.1",
)
assert out[0][0] == 1
def test_pg_sni_router_in_proxy(
static_proxy: NeonProxy,
vanilla_pg: VanillaPostgres,
):
# static_proxy starts this.
assert vanilla_pg.is_running()
pg_port = vanilla_pg.default_options["port"]
out = static_proxy.safe_psql(
"select 1",
dbname="postgres",
sslmode="require",
host=f"endpoint--namespace--{pg_port}.local.neon.build",
hostaddr="127.0.0.1",
port=static_proxy.router_port,
)
assert out[0][0] == 1

16
vendor/revisions.json vendored
View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.4",
"b763ab54b98d232a0959371ab1d07f06ed77c49e"
"17.5",
"e5374b72997b0afc8374137674e873f7a558120a"
],
"v16": [
"16.8",
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
"16.9",
"bb5eee65ac753b5a66d255ec5fb4c0e33180e8fd"
],
"v15": [
"15.12",
"b838c8969b7c63f3e637a769656f5f36793b797c"
"15.13",
"052df87d338dc30687d0c96f1a4d9b6cb4882b2e"
],
"v14": [
"14.17",
"108856a4ae76be285b04497a0ed08fcbe60ddbe9"
"14.18",
"ead1e76bdcb71ef87f52f0610bd7333247f75179"
]
}

View File

@@ -39,8 +39,10 @@ env_logger = { version = "0.11" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
form_urlencoded = { version = "1" }
futures-channel = { version = "0.3", features = ["sink"] }
futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
futures-task = { version = "0.3", default-features = false, features = ["std"] }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
@@ -70,6 +72,7 @@ num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
percent-encoding = { version = "2" }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }