Compare commits

..

25 Commits

Author SHA1 Message Date
Conrad Ludgate
c0e1e1dd74 standardise logging 2025-06-25 15:54:26 +01:00
Conrad Ludgate
010cd34635 simplify error handling 2025-06-25 15:54:26 +01:00
Conrad Ludgate
11ffe4c86c remove unused error retries 2025-06-25 15:54:26 +01:00
Conrad Ludgate
d6a5085664 move h2::handshake outside of hypermechanism 2025-06-25 15:54:26 +01:00
Conrad Ludgate
16d9889a51 move authenticate outside of tokiomechanism 2025-06-25 15:54:26 +01:00
Conrad Ludgate
517a3d0d86 [proxy]: BatchQueue::call is not cancel safe - make it directly cancellation aware (#12345)
## Problem

https://github.com/neondatabase/cloud/issues/30539

If the current leader cancels the `call` function, then it has removed
the jobs from the queue, but will never finish sending the responses.
Because of this, it is not cancellation safe.

## Summary of changes

Document these functions as not cancellation safe. Move cancellation of
the queued jobs into the queue itself.

## Alternatives considered

1. We could spawn the task that runs the batch, since that won't get
cancelled.
* This requires `fn call(self: Arc<Self>)` or `fn call(&'static self)`.
2. We could add another scopeguard and return the requests back to the
queue.
* This requires that requests are always retry safe, and also requires
requests to be `Clone`.
2025-06-25 14:19:20 +00:00
Conrad Ludgate
27ca1e21be [console_redirect_proxy]: fix channel binding (#12238)
## Problem

While working more on TLS to compute, I realised that Console Redirect
-> pg-sni-router -> compute would break if channel binding was set to
prefer. This is because the channel binding data would differ between
Console Redirect -> pg-sni-router vs pg-sni-router -> compute.

I also noticed that I actually disabled channel binding in #12145, since
`connect_raw` would think that the connection didn't support TLS.

## Summary of changes

Make sure we specify the channel binding.
Make sure that `connect_raw` can see if we have TLS support.
2025-06-25 13:41:30 +00:00
Arpad Müller
1dc01c9bed Support cancellations of timelines with hanging ondemand downloads (#12330)
In `test_layer_download_cancelled_by_config_location`, we simulate hung
downloads via the `before-downloading-layer-stream-pausable` failpoint.
Then, we cancel a timeline via the `location_config` endpoint.

With the new default as of
https://github.com/neondatabase/neon/pull/11712, we would be creating
the timeline on safekeepers regardless if there have been writes or not,
and it turns out the test relied on the timeline not existing on
safekeepers, due to a cancellation bug:

* as established before, the test makes the read path hang
* the timeline cancellation function first cancels the walreceiver, and
only then cancels the timeline's token
* `WalIngest::new` is requesting a checkpoint, which hits the read path
* at cancellation time, we'd be hanging inside the read, not seeing the
cancellation of the walreceiver
* the test would time out due to the hang

This is probably also reproducible in the wild when there is S3
unavailabilies or bottlenecks. So we thought that it's worthwhile to fix
the hang issue. The approach chosen in the end involves the
`tokio::select` macro.

In PR 11712, we originally punted on the test due to the hang and opted
it out from the new default, but now we can use the new default.

Part of https://github.com/neondatabase/neon/issues/12299
2025-06-25 13:40:38 +00:00
Heikki Linnakangas
7c4c36f5ac Remove unnecessary separate installation of libpq (#12287)
`make install` compiles and installs libpq. Remove redundant separate
step to compile and install it.
2025-06-25 10:47:56 +00:00
Tristan Partin
a2d623696c Update pgaudit to latest versions (#12328)
These updates contain some bug fixes and are completely backwards
compatible with what we currently support in Neon.

Link: https://github.com/pgaudit/pgaudit/compare/1.6.2...1.6.3
Link: https://github.com/pgaudit/pgaudit/compare/1.7.0...1.7.1
Link: https://github.com/pgaudit/pgaudit/compare/16.0...16.1
Link: https://github.com/pgaudit/pgaudit/compare/17.0...17.1
Signed-off-by: Tristan Partin <tristan.partin@databricks.com>

Signed-off-by: Tristan Partin <tristan.partin@databricks.com>
2025-06-25 09:03:02 +00:00
Tristan Partin
aa75722010 Set pgaudit.log=none for monitoring connections (#12137)
pgaudit can spam logs due to all the monitoring that we do. Logs from
these connections are not necessary for HIPPA compliance, so we can stop
logging from those connections.

Part-of: https://github.com/neondatabase/cloud/issues/29574

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-06-24 17:42:23 +00:00
Matthias van de Meent
6c6de6382a Use enum-typed PG versions (#12317)
This makes it possible for the compiler to validate that a match block
matched all PostgreSQL versions we support.

## Problem
We did not have a complete picture about which places we had to test
against PG versions, and what format these versions were: The full PG
version ID format (Major/minor/bugfix `MMmmbb`) as transfered in
protocol messages, or only the Major release version (`MM`). This meant
type confusion was rampant.

With this change, it becomes easier to develop new version-dependent
features, by making type and niche confusion impossible.

## Summary of changes
Every use of `pg_version` is now typed as either `PgVersionId` (u32,
valued in decimal `MMmmbb`) or PgMajorVersion (an enum, with a value for
every major version we support, serialized and stored like a u32 with
the value of that major version)

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2025-06-24 17:25:31 +00:00
Dmitry Savelev
158d84ea30 Switch the billing metrics storage format to ndjson. (#12338)
## Problem

The billing team wants to change the billing events pipeline and use a
common events format in S3 buckets across different event producers.

## Summary of changes

Change the events storage format for billing events from JSON to NDJSON.

Resolves: https://github.com/neondatabase/cloud/issues/29994
2025-06-24 15:36:36 +00:00
Conrad Ludgate
4dd9ca7b04 [proxy]: authenticate to compute after connect_to_compute (#12335)
## Problem

PGLB will do the connect_to_compute logic, neonkeeper will do the
session establishment logic. We should split it.

## Summary of changes

Moves postgres authentication to compute to a separate routine that
happens after connect_to_compute.
2025-06-24 14:15:36 +00:00
Arpad Müller
552249607d apply clippy fixes for 1.88.0 beta (#12331)
The 1.88.0 stable release is near (this Thursday). We'd like to fix most
warnings beforehand so that the compiler upgrade doesn't require
approval from too many teams.

This is therefore a preparation PR (like similar PRs before it).

There is a lot of changes for this release, mostly because the
`uninlined_format_args` lint has been added to the `style` lint group.
One can read more about the lint
[here](https://rust-lang.github.io/rust-clippy/master/#/uninlined_format_args).

The PR is the result of `cargo +beta clippy --fix` and `cargo fmt`. One
remaining warning is left for the proxy team.

---------

Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2025-06-24 10:12:42 +00:00
Ivan Efremov
a29772bf6e Create proxy-bench periodic run in CI (#12242)
Currently run for test only via pushing to the test-proxy-bench branch.

Relates to the #22681
2025-06-24 09:54:43 +00:00
Arpad Müller
0efff1db26 Allow cancellation errors in tests that allow timeline deletion errors (#12315)
After merging of PR https://github.com/neondatabase/neon/pull/11712 we
saw some tests be flaky, with errors showing up about the timeline
having been cancelled instead of having been deleted. This is an outcome
that is inherently racy with the "has been deleted" error.

In some instances, https://github.com/neondatabase/neon/pull/11712 has
already added the error about the timeline having been cancelled. This
PR adds them to the remaining instances of
https://github.com/neondatabase/neon/pull/11712, fixing the flakiness.
2025-06-23 22:26:38 +00:00
Aleksandr Sarantsev
5eecde461d storcon: Fix migration for Attached(0) tenants (#12256)
## Problem

`Attached(0)` tenant migrations can get stuck if the heatmap file has
not been uploaded.

## Summary of Changes

- Added a test to reproduce the issue.
- Introduced a `kick_secondary_downloads` config flag:
  - Enabled in testing environments.
  - Disabled in production (and in the new test).
- Updated `Attached(0)` locations to consider the number of secondaries
in their intent when deciding whether to download the heatmap.
2025-06-23 18:55:26 +00:00
Alex Chi Z.
85164422d0 feat(pageserver): support force overriding feature flags (#12233)
## Problem

Part of #11813 

## Summary of changes

Add a test API to make it easier to manipulate the feature flags within
tests.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-23 17:31:53 +00:00
John Spray
6c3aba7c44 storcon: adjust AZ selection for heterogenous AZs (#12296)
## Problem

The scheduler uses total shards per AZ to select the AZ for newly
created or attached tenants.

This makes bad decisions when we have different node counts per AZ -- we
might have 2 very busy pageservers in one AZ, and 4 more lightly loaded
pageservers in other AZs, and the scheduler picks the busy pageservers
because the total shard count in their AZ is lower.

## Summary of changes

- Divide the shard count by the number of nodes in the AZ when scoring
in `get_az_for_new_tenant`

---------

Co-authored-by: John Spray <john.spray@databricks.com>
2025-06-23 15:50:31 +00:00
Erik Grinaker
68a175d545 test_runner: fix test_basebackup_with_high_slru_count gzip param (#12319)
The `--gzip-probability` parameter was removed in #12250. However,
`test_basebackup_with_high_slru_count` still uses it, and keeps failing.

This patch removes the use of the parameter (gzip is enabled by
default).
2025-06-23 15:33:45 +00:00
Alex Chi Z.
5e2c444525 fix(pageserver): reduce default feature flag refresh interval (#12246)
## Problem

Part of #11813 

## Summary of changes

The current interval is 30s and it costs a lot of $$$. This patch
reduced it to 600s refresh interval (which means that it takes 10min for
feature flags to propagate from UI to the pageserver). In the future we
can let storcon retrieve the feature flags and push it to pageservers.
We can consider creating a new release or we can postpone this to the
week after the next week.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-23 13:51:21 +00:00
Heikki Linnakangas
8d711229c1 ci: Fix bogus skipping of 'make all' step in CI (#12318)
The 'make all' step must run always. PR #12311 accidentally left the
condition in there to skip it if there were no changes in postgres v14
sources. That condition belonged to a whole different step that was
removed altogether in PR#12311, and the condition should've been removed
too.

Per CI failure:
https://github.com/neondatabase/neon/actions/runs/15820148967/job/44587394469
2025-06-23 13:23:33 +00:00
Vlad Lazar
0e490f3be7 pageserver: allow concurrent rw IO on in-mem layer (#12151)
## Problem

Previously, we couldn't read from an in-memory layer while a batch was
being written to it. Vice-versa, we couldn't write to it while there
was an on-going read.

## Summary of Changes

The goal of this change is to improve concurrency. Writes happened
through a &mut self method so the enforcement was at the type system
level.

We attempt to improve by:
1. Adding interior mutability to EphemeralLayer. This involves wrapping
   the buffered writer in a read-write lock.
2. Minimise the time that the read lock is held for. Only hold the read
   lock while reading from the buffers (recently flushed or pending
   flush). If we need to read from the file, drop the lock and allow IO
   to be concurrent.
   
The new benchmark variants with concurrent reads improve between 70 to
200 percent (against main).
Benchmark results are in this
[commit](891f094ce6).

## Future Changes

We can push the interior mutability into the buffered writer. The
mutable tail goes under a read lock, the flushed part goes into an
ArcSwap and then we can read from anything that is flushed _without_ any
locking.
2025-06-23 13:17:30 +00:00
Erik Grinaker
7e41ef1bec pageserver: set gRPC basebackup chunk size to 256 KB (#12314)
gRPC base backups send a stream of fixed-size 64KB chunks.

pagebench basebackup with compression enabled shows this to reduce
throughput:

* 64 KB: 55 RPS
* 128 KB: 69 RPS
* 256 KB: 73 RPS
* 1024 KB: 73 RPS

This patch sets the base backup chunk size to 256 KB.
2025-06-23 12:41:11 +00:00
165 changed files with 1964 additions and 1557 deletions

View File

@@ -189,7 +189,6 @@ jobs:
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Build all
if: steps.cache_pg_14.outputs.cache-hit != 'true'
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables
run: mold -run make ${make_vars} all -j$(nproc) CARGO_BUILD_FLAGS="$CARGO_FLAGS"

83
.github/workflows/proxy-benchmark.yml vendored Normal file
View File

@@ -0,0 +1,83 @@
name: Periodic proxy performance test on unit-perf hetzner runner
on:
push: # TODO: remove after testing
branches:
- test-proxy-bench # Runs on pushes to branches starting with test-proxy-bench
# schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
# - cron: '0 5 * * *' # Runs at 5 UTC once a day
workflow_dispatch: # adds an ability to run this manually
defaults:
run:
shell: bash -euo pipefail {0}
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
permissions:
contents: read
jobs:
run_periodic_proxybench_test:
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
runs-on: [self-hosted, unit-perf]
timeout-minutes: 60 # 1h timeout
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Checkout proxy-bench Repo
uses: actions/checkout@v4
with:
repository: neondatabase/proxy-bench
path: proxy-bench
- name: Set up the environment which depends on $RUNNER_TEMP on nvme drive
id: set-env
shell: bash -euxo pipefail {0}
run: |
PROXY_BENCH_PATH=$(realpath ./proxy-bench)
{
echo "PROXY_BENCH_PATH=$PROXY_BENCH_PATH"
echo "NEON_DIR=${RUNNER_TEMP}/neon"
echo "TEST_OUTPUT=${PROXY_BENCH_PATH}/test_output"
echo ""
} >> "$GITHUB_ENV"
- name: Run proxy-bench
run: ./${PROXY_BENCH_PATH}/run.sh
- name: Ingest Bench Results # neon repo script
if: success()
run: |
mkdir -p $TEST_OUTPUT
python $NEON_DIR/scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
- name: Push Metrics to Proxy perf database
if: success()
env:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PROXY_TEST_RESULT_CONNSTR }}"
REPORT_FROM: $TEST_OUTPUT
run: $NEON_DIR/scripts/generate_and_push_perf_report.sh
- name: Docker cleanup
run: docker compose down
- name: Notify Failure
if: failure()
run: echo "Proxy bench job failed" && exit 1

View File

@@ -159,8 +159,6 @@ postgres-%: postgres-configure-% \
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
+@echo "Compiling PostgreSQL $*"
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install
+@echo "Compiling libpq $*"
$(MAKE) -C $(BUILD_DIR)/$*/src/interfaces/libpq install
+@echo "Compiling pg_prewarm $*"
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install
+@echo "Compiling pg_buffercache $*"

View File

@@ -22,7 +22,7 @@ sql_exporter.yml: $(jsonnet_files)
--output-file etc/$@ \
--tla-str collector_name=neon_collector \
--tla-str collector_file=neon_collector.yml \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter' \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter&pgaudit.log=none' \
etc/sql_exporter.jsonnet
sql_exporter_autoscaling.yml: $(jsonnet_files)
@@ -30,7 +30,7 @@ sql_exporter_autoscaling.yml: $(jsonnet_files)
--output-file etc/$@ \
--tla-str collector_name=neon_collector_autoscaling \
--tla-str collector_file=neon_collector_autoscaling.yml \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling' \
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling&pgaudit.log=none' \
etc/sql_exporter.jsonnet
.PHONY: clean

View File

@@ -171,9 +171,6 @@ RUN cd postgres && \
eval $CONFIGURE_CMD && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
# Enable some of contrib extensions
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/dblink.control && \
@@ -1568,20 +1565,20 @@ ARG PG_VERSION
WORKDIR /ext-src
RUN case "${PG_VERSION}" in \
"v14") \
export PGAUDIT_VERSION=1.6.2 \
export PGAUDIT_CHECKSUM=1f350d70a0cbf488c0f2b485e3a5c9b11f78ad9e3cbb95ef6904afa1eb3187eb \
export PGAUDIT_VERSION=1.6.3 \
export PGAUDIT_CHECKSUM=37a8f5a7cc8d9188e536d15cf0fdc457fcdab2547caedb54442c37f124110919 \
;; \
"v15") \
export PGAUDIT_VERSION=1.7.0 \
export PGAUDIT_CHECKSUM=8f4a73e451c88c567e516e6cba7dc1e23bc91686bb6f1f77f8f3126d428a8bd8 \
export PGAUDIT_VERSION=1.7.1 \
export PGAUDIT_CHECKSUM=e9c8e6e092d82b2f901d72555ce0fe7780552f35f8985573796cd7e64b09d4ec \
;; \
"v16") \
export PGAUDIT_VERSION=16.0 \
export PGAUDIT_CHECKSUM=d53ef985f2d0b15ba25c512c4ce967dce07b94fd4422c95bd04c4c1a055fe738 \
export PGAUDIT_VERSION=16.1 \
export PGAUDIT_CHECKSUM=3bae908ab70ba0c6f51224009dbcfff1a97bd6104c6273297a64292e1b921fee \
;; \
"v17") \
export PGAUDIT_VERSION=17.0 \
export PGAUDIT_CHECKSUM=7d0d08d030275d525f36cd48b38c6455f1023da863385badff0cec44965bfd8c \
export PGAUDIT_VERSION=17.1 \
export PGAUDIT_CHECKSUM=9c5f37504d393486cc75d2ced83f75f5899be64fa85f689d6babb833b4361e6c \
;; \
*) \
echo "pgaudit is not supported on this PostgreSQL version" && exit 1;; \

View File

@@ -26,7 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn
@@ -59,7 +59,7 @@ files:
# the rules use ALL as the hostname. Avoid the pointless lookups and the "unable to
# resolve host" log messages that they generate.
Defaults !fqdn
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)

View File

@@ -26,7 +26,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn
@@ -59,7 +59,7 @@ files:
# the rules use ALL as the hostname. Avoid the pointless lookups and the "unable to
# resolve host" log messages that they generate.
Defaults !fqdn
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)

View File

@@ -482,10 +482,8 @@ async fn cmd_pgdata(
};
let superuser = "cloud_admin";
let destination_connstring = format!(
"host=localhost port={} user={} dbname=neondb",
pg_port, superuser
);
let destination_connstring =
format!("host=localhost port={pg_port} user={superuser} dbname=neondb");
let pgdata_dir = workdir.join("pgdata");
let mut proc = PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());

View File

@@ -69,7 +69,7 @@ impl clap::builder::TypedValueParser for S3Uri {
S3Uri::from_str(value_str).map_err(|e| {
clap::Error::raw(
clap::error::ErrorKind::InvalidValue,
format!("Failed to parse S3 URI: {}", e),
format!("Failed to parse S3 URI: {e}"),
)
})
}

View File

@@ -22,7 +22,7 @@ pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<Cat
spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -119,7 +119,7 @@ pub async fn get_database_schema(
_ => {
let mut lines = stderr_reader.lines();
if let Some(line) = lines.next_line().await? {
if line.contains(&format!("FATAL: database \"{}\" does not exist", dbname)) {
if line.contains(&format!("FATAL: database \"{dbname}\" does not exist")) {
return Err(SchemaDumpError::DatabaseDoesNotExist);
}
warn!("pg_dump stderr: {}", line)

View File

@@ -250,8 +250,7 @@ impl ParsedSpec {
// duplicate entry?
if current == previous {
return Err(format!(
"duplicate entry in safekeeper_connstrings: {}!",
current,
"duplicate entry in safekeeper_connstrings: {current}!",
));
}
@@ -406,11 +405,11 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
let options = match conn_conf.get_options() {
// Allow the control plane to override any options set by the
// compute
Some(options) => format!("{} {}", EXTRA_OPTIONS, options),
Some(options) => format!("{EXTRA_OPTIONS} {options}"),
None => EXTRA_OPTIONS.to_string(),
};
conn_conf.options(&options);
@@ -1127,7 +1126,7 @@ impl ComputeNode {
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
// Format connstr
let id = connstr.clone();
let connstr = format!("postgresql://no_user@{}", connstr);
let connstr = format!("postgresql://no_user@{connstr}");
let options = format!(
"-c timeline_id={} tenant_id={}",
pspec.timeline_id, pspec.tenant_id
@@ -1490,7 +1489,7 @@ impl ComputeNode {
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -1633,7 +1632,7 @@ impl ComputeNode {
Ok((mut client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(&mut client).await {
@@ -1937,7 +1936,7 @@ impl ComputeNode {
let (client, connection) = connect_result.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
let result = client
@@ -2106,7 +2105,7 @@ LIMIT 100",
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
}
Ok(())
@@ -2133,7 +2132,7 @@ LIMIT 100",
let version: Option<ExtVersion> = db_client
.query_opt(version_query, &[&ext_name])
.await
.with_context(|| format!("Failed to execute query: {}", version_query))?
.with_context(|| format!("Failed to execute query: {version_query}"))?
.map(|row| row.get(0));
// sanitize the inputs as postgres idents.
@@ -2148,14 +2147,14 @@ LIMIT 100",
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
} else {
let query =
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
}
Ok(ext_version)

View File

@@ -51,7 +51,7 @@ pub fn write_postgres_conf(
// Write the postgresql.conf content from the spec file as is.
if let Some(conf) = &spec.cluster.postgresql_conf {
writeln!(file, "{}", conf)?;
writeln!(file, "{conf}")?;
}
// Add options for connecting to storage
@@ -70,7 +70,7 @@ pub fn write_postgres_conf(
);
// If generation is given, prepend sk list with g#number:
if let Some(generation) = spec.safekeepers_generation {
write!(neon_safekeepers_value, "g#{}:", generation)?;
write!(neon_safekeepers_value, "g#{generation}:")?;
}
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
writeln!(
@@ -109,8 +109,8 @@ pub fn write_postgres_conf(
tls::update_key_path_blocking(pgdata_path, tls_config);
// these are the default, but good to be explicit.
writeln!(file, "ssl_cert_file = '{}'", SERVER_CRT)?;
writeln!(file, "ssl_key_file = '{}'", SERVER_KEY)?;
writeln!(file, "ssl_cert_file = '{SERVER_CRT}'")?;
writeln!(file, "ssl_key_file = '{SERVER_KEY}'")?;
}
// Locales
@@ -191,8 +191,7 @@ pub fn write_postgres_conf(
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
)?;
} else {
// Typically, this should be unreacheable,
@@ -244,8 +243,7 @@ pub fn write_postgres_conf(
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
)?;
} else {
// Typically, this should be unreacheable,
@@ -263,7 +261,7 @@ pub fn write_postgres_conf(
}
}
writeln!(file, "neon.extension_server_port={}", extension_server_port)?;
writeln!(file, "neon.extension_server_port={extension_server_port}")?;
if spec.drop_subscriptions_before_start {
writeln!(file, "neon.disable_logical_replication_subscribers=true")?;
@@ -291,7 +289,7 @@ where
{
let path = pgdata_path.join("compute_ctl_temp_override.conf");
let mut file = File::create(path)?;
write!(file, "{}", options)?;
write!(file, "{options}")?;
let res = exec();

View File

@@ -296,10 +296,7 @@ async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Re
async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!(
"could not perform remote extensions server request: {:?}",
e
),
format!("could not perform remote extensions server request: {e:?}"),
UNKNOWN_HTTP_STATUS.to_string(),
)
})?;
@@ -309,7 +306,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
StatusCode::OK => match resp.bytes().await {
Ok(resp) => Ok(resp),
Err(e) => Err((
format!("could not read remote extensions server response: {:?}", e),
format!("could not read remote extensions server response: {e:?}"),
// It's fine to return and report error with status as 200 OK,
// because we still failed to read the response.
status.to_string(),
@@ -320,10 +317,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
status.to_string(),
)),
_ => Err((
format!(
"unexpected remote extensions server response status code: {}",
status
),
format!("unexpected remote extensions server response status code: {status}"),
status.to_string(),
)),
}

View File

@@ -65,7 +65,7 @@ pub(in crate::http) async fn configure(
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
let msg = format!("compute configuration failed: {err:?}");
return Err(msg);
}
}

View File

@@ -43,7 +43,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -57,7 +57,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
let (client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -130,7 +130,7 @@ fn try_acquire_lsn_lease(
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
let res = client.simple_query(&cmd)?;
let msg = match res.first() {
Some(msg) => msg,

View File

@@ -36,9 +36,9 @@ pub fn escape_literal(s: &str) -> String {
let res = s.replace('\'', "''").replace('\\', "\\\\");
if res.contains('\\') {
format!("E'{}'", res)
format!("E'{res}'")
} else {
format!("'{}'", res)
format!("'{res}'")
}
}
@@ -46,7 +46,7 @@ pub fn escape_literal(s: &str) -> String {
/// with `'{}'` is not required, as it returns a ready-to-use config string.
pub fn escape_conf_value(s: &str) -> String {
let res = s.replace('\'', "''").replace('\\', "\\\\");
format!("'{}'", res)
format!("'{res}'")
}
pub trait GenericOptionExt {
@@ -446,7 +446,7 @@ pub async fn tune_pgbouncer(
let mut pgbouncer_connstr =
"host=localhost port=6432 dbname=pgbouncer user=postgres sslmode=disable".to_string();
if let Ok(pass) = std::env::var("PGBOUNCER_PASSWORD") {
pgbouncer_connstr.push_str(format!(" password={}", pass).as_str());
pgbouncer_connstr.push_str(format!(" password={pass}").as_str());
}
pgbouncer_connstr
};
@@ -464,7 +464,7 @@ pub async fn tune_pgbouncer(
Ok((client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
break client;

View File

@@ -23,12 +23,12 @@ fn do_control_plane_request(
) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
.header("Authorization", format!("Bearer {jwt}"))
.send()
.map_err(|e| {
(
true,
format!("could not perform request to control plane: {:?}", e),
format!("could not perform request to control plane: {e:?}"),
UNKNOWN_HTTP_STATUS.to_string(),
)
})?;
@@ -39,7 +39,7 @@ fn do_control_plane_request(
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {:?}", e),
format!("could not deserialize control plane response: {e:?}"),
status.to_string(),
)),
},
@@ -62,7 +62,7 @@ fn do_control_plane_request(
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!("unexpected control plane response status code: {}", status),
format!("unexpected control plane response status code: {status}"),
status.to_string(),
)),
}

View File

@@ -933,56 +933,53 @@ async fn get_operations<'a>(
PerDatabasePhase::DeleteDBRoleReferences => {
let ctx = ctx.read().await;
let operations =
spec.delta_operations
.iter()
.flatten()
.filter(|op| op.action == "delete_role")
.filter_map(move |op| {
if db.is_owned_by(&op.name) {
return None;
}
if !ctx.roles.contains_key(&op.name) {
return None;
}
let quoted = op.name.pg_quote();
let new_owner = match &db {
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
DB::UserDB(db) => db.owner.pg_quote(),
};
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
let operations = spec
.delta_operations
.iter()
.flatten()
.filter(|op| op.action == "delete_role")
.filter_map(move |op| {
if db.is_owned_by(&op.name) {
return None;
}
if !ctx.roles.contains_key(&op.name) {
return None;
}
let quoted = op.name.pg_quote();
let new_owner = match &db {
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
DB::UserDB(db) => db.owner.pg_quote(),
};
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
Some(vec![
// This will reassign all dependent objects to the db owner
Operation {
query: format!(
"REASSIGN OWNED BY {} TO {}",
quoted, new_owner,
),
comment: None,
},
// Revoke some potentially blocking privileges (Neon-specific currently)
Operation {
query: format!(
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
),
comment: None,
},
// This now will only drop privileges of the role
// TODO: this is obviously not 100% true because of the above case,
// there could be still some privileges that are not revoked. Maybe this
// only drops privileges that were granted *by this* role, not *to this* role,
// but this has to be checked.
Operation {
query: format!("DROP OWNED BY {}", quoted),
comment: None,
},
])
})
.flatten();
Some(vec![
// This will reassign all dependent objects to the db owner
Operation {
query: format!("REASSIGN OWNED BY {quoted} TO {new_owner}",),
comment: None,
},
// Revoke some potentially blocking privileges (Neon-specific currently)
Operation {
query: format!(
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
),
comment: None,
},
// This now will only drop privileges of the role
// TODO: this is obviously not 100% true because of the above case,
// there could be still some privileges that are not revoked. Maybe this
// only drops privileges that were granted *by this* role, not *to this* role,
// but this has to be checked.
Operation {
query: format!("DROP OWNED BY {quoted}"),
comment: None,
},
])
})
.flatten();
Ok(Box::new(operations))
}

View File

@@ -27,7 +27,7 @@ pub async fn ping_safekeeper(
let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -919,7 +919,7 @@ fn print_timeline(
br_sym = "┗━";
}
print!("{} @{}: ", br_sym, ancestor_lsn);
print!("{br_sym} @{ancestor_lsn}: ");
}
// Finally print a timeline id and name with new line
@@ -1742,7 +1742,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
StopMode::Immediate => true,
};
if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
eprintln!("pageserver stop failed: {}", e);
eprintln!("pageserver stop failed: {e}");
exit(1);
}
}
@@ -1751,7 +1751,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
let pageserver = get_pageserver(env, args.pageserver_id)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
eprintln!("pageserver stop failed: {e}");
exit(1);
}
@@ -1768,7 +1768,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
{
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);
eprintln!("Page server is not available: {err}");
exit(1);
}
}
@@ -1805,7 +1805,7 @@ async fn handle_storage_controller(
},
};
if let Err(e) = svc.stop(stop_args).await {
eprintln!("stop failed: {}", e);
eprintln!("stop failed: {e}");
exit(1);
}
}
@@ -1827,7 +1827,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
let safekeeper = get_safekeeper(env, args.id)?;
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
eprintln!("safekeeper start failed: {}", e);
eprintln!("safekeeper start failed: {e}");
exit(1);
}
}
@@ -1839,7 +1839,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
StopMode::Immediate => true,
};
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
eprintln!("safekeeper stop failed: {e}");
exit(1);
}
}
@@ -1852,12 +1852,12 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
};
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
eprintln!("safekeeper stop failed: {e}");
exit(1);
}
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
eprintln!("safekeeper start failed: {}", e);
eprintln!("safekeeper start failed: {e}");
exit(1);
}
}
@@ -2113,7 +2113,7 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let storage = EndpointStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("endpoint_storage stop failed: {:#}", e);
eprintln!("endpoint_storage stop failed: {e:#}");
}
for ps_conf in &env.pageservers {

View File

@@ -847,10 +847,10 @@ impl Endpoint {
// Launch compute_ctl
let conn_str = self.connstr("cloud_admin", "postgres");
println!("Starting postgres node at '{}'", conn_str);
println!("Starting postgres node at '{conn_str}'");
if create_test_user {
let conn_str = self.connstr("test", "neondb");
println!("Also at '{}'", conn_str);
println!("Also at '{conn_str}'");
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
cmd.args([
@@ -949,8 +949,7 @@ impl Endpoint {
Err(e) => {
if Instant::now().duration_since(start_at) > start_timeout {
return Err(e).context(format!(
"timed out {:?} waiting to connect to compute_ctl HTTP",
start_timeout,
"timed out {start_timeout:?} waiting to connect to compute_ctl HTTP",
));
}
}
@@ -989,7 +988,7 @@ impl Endpoint {
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = response.url().to_owned();
let msg = match response.text().await {
Ok(err_body) => format!("Error: {}", err_body),
Ok(err_body) => format!("Error: {err_body}"),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
@@ -1055,7 +1054,7 @@ impl Endpoint {
} else {
let url = response.url().to_owned();
let msg = match response.text().await {
Ok(err_body) => format!("Error: {}", err_body),
Ok(err_body) => format!("Error: {err_body}"),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))

View File

@@ -212,6 +212,8 @@ pub struct NeonStorageControllerConf {
pub use_local_compute_notifications: bool,
pub timeline_safekeeper_count: Option<i64>,
pub kick_secondary_downloads: Option<bool>,
}
impl NeonStorageControllerConf {
@@ -243,6 +245,7 @@ impl Default for NeonStorageControllerConf {
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
timeline_safekeeper_count: None,
kick_secondary_downloads: None,
}
}
}
@@ -258,7 +261,7 @@ impl Default for EndpointStorageConf {
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {
format!("https://{}", addr)
format!("https://{addr}")
} else {
format!(
"http://{}",
@@ -727,7 +730,7 @@ impl LocalEnv {
let config_toml_path = dentry.path().join("pageserver.toml");
let config_toml: PageserverConfigTomlSubset = toml_edit::de::from_str(
&std::fs::read_to_string(&config_toml_path)
.with_context(|| format!("read {:?}", config_toml_path))?,
.with_context(|| format!("read {config_toml_path:?}"))?,
)
.context("parse pageserver.toml")?;
let identity_toml_path = dentry.path().join("identity.toml");
@@ -737,7 +740,7 @@ impl LocalEnv {
}
let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
&std::fs::read_to_string(&identity_toml_path)
.with_context(|| format!("read {:?}", identity_toml_path))?,
.with_context(|| format!("read {identity_toml_path:?}"))?,
)
.context("parse identity.toml")?;
let PageserverConfigTomlSubset {

View File

@@ -122,7 +122,7 @@ impl PageServerNode {
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
overrides.push(format!("control_plane_api_token='{jwt_token}'"));
}
if !conf.other.contains_key("remote_storage") {

View File

@@ -143,7 +143,7 @@ impl SafekeeperNode {
let id_string = id.to_string();
// TODO: add availability_zone to the config.
// Right now we just specify any value here and use it to check metrics in tests.
let availability_zone = format!("sk-{}", id_string);
let availability_zone = format!("sk-{id_string}");
let mut args = vec![
"-D".to_owned(),

View File

@@ -167,7 +167,7 @@ impl StorageController {
fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
self.env
.base_data_dir
.join(format!("storage_controller_{}", instance_id))
.join(format!("storage_controller_{instance_id}"))
}
fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
@@ -226,7 +226,7 @@ impl StorageController {
"-d",
DB_NAME,
"-p",
&format!("{}", postgres_port),
&format!("{postgres_port}"),
];
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
@@ -269,7 +269,7 @@ impl StorageController {
"-h",
"localhost",
"-p",
&format!("{}", postgres_port),
&format!("{postgres_port}"),
"-U",
&username(),
"-O",
@@ -431,7 +431,7 @@ impl StorageController {
// from `LocalEnv`'s config file (`.neon/config`).
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}\nfsync=off\n", postgres_port),
format!("port = {postgres_port}\nfsync=off\n"),
)
.await?;
@@ -483,7 +483,7 @@ impl StorageController {
self.setup_database(postgres_port).await?;
}
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
let database_url = format!("postgresql://localhost:{postgres_port}/{DB_NAME}");
// We support running a startup SQL script to fiddle with the database before we launch storcon.
// This is used by the test suite.
@@ -514,7 +514,7 @@ impl StorageController {
drop(client);
conn.await??;
let addr = format!("{}:{}", host, listen_port);
let addr = format!("{host}:{listen_port}");
let address_for_peers = Uri::builder()
.scheme(scheme)
.authority(addr.clone())
@@ -563,6 +563,10 @@ impl StorageController {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(value) = self.config.kick_secondary_downloads {
args.push(format!("--kick-secondary-downloads={value}"));
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}
@@ -812,9 +816,9 @@ impl StorageController {
builder = builder.json(&body)
}
if let Some(private_key) = &self.private_key {
println!("Getting claims for path {}", path);
println!("Getting claims for path {path}");
if let Some(required_claims) = Self::get_claims_for_path(&path)? {
println!("Got claims {:?} for path {}", required_claims, path);
println!("Got claims {required_claims:?} for path {path}");
let jwt_token = encode_from_key_file(&required_claims, private_key)?;
builder = builder.header(
reqwest::header::AUTHORIZATION,

View File

@@ -649,7 +649,7 @@ async fn main() -> anyhow::Result<()> {
response
.new_shards
.iter()
.map(|s| format!("{:?}", s))
.map(|s| format!("{s:?}"))
.collect::<Vec<_>>()
.join(",")
);
@@ -771,8 +771,8 @@ async fn main() -> anyhow::Result<()> {
println!("Tenant {tenant_id}");
let mut table = comfy_table::Table::new();
table.add_row(["Policy", &format!("{:?}", policy)]);
table.add_row(["Stripe size", &format!("{:?}", stripe_size)]);
table.add_row(["Policy", &format!("{policy:?}")]);
table.add_row(["Stripe size", &format!("{stripe_size:?}")]);
table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
println!("{table}");
println!("Shards:");
@@ -789,7 +789,7 @@ async fn main() -> anyhow::Result<()> {
let secondary = shard
.node_secondary
.iter()
.map(|n| format!("{}", n))
.map(|n| format!("{n}"))
.collect::<Vec<_>>()
.join(",");
@@ -863,7 +863,7 @@ async fn main() -> anyhow::Result<()> {
}
} else {
// Make it obvious to the user that since they've omitted an AZ, we're clearing it
eprintln!("Clearing preferred AZ for tenant {}", tenant_id);
eprintln!("Clearing preferred AZ for tenant {tenant_id}");
}
// Construct a request that modifies all the tenant's shards
@@ -1134,8 +1134,7 @@ async fn main() -> anyhow::Result<()> {
Err((tenant_shard_id, from, to, error)) => {
failure += 1;
println!(
"Failed to migrate {} from node {} to node {}: {}",
tenant_shard_id, from, to, error
"Failed to migrate {tenant_shard_id} from node {from} to node {to}: {error}"
);
}
}
@@ -1277,8 +1276,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let mut path = format!(
"/v1/tenant/{}/timeline/{}/download_heatmap_layers",
tenant_shard_id, timeline_id,
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
);
if let Some(c) = concurrency {
@@ -1303,8 +1301,7 @@ async fn watch_tenant_shard(
) -> anyhow::Result<()> {
if let Some(until_migrated_to) = until_migrated_to {
println!(
"Waiting for tenant shard {} to be migrated to node {}",
tenant_shard_id, until_migrated_to
"Waiting for tenant shard {tenant_shard_id} to be migrated to node {until_migrated_to}"
);
}
@@ -1327,7 +1324,7 @@ async fn watch_tenant_shard(
"attached: {} secondary: {} {}",
shard
.node_attached
.map(|n| format!("{}", n))
.map(|n| format!("{n}"))
.unwrap_or("none".to_string()),
shard
.node_secondary
@@ -1341,15 +1338,12 @@ async fn watch_tenant_shard(
"(reconciler idle)"
}
);
println!("{}", summary);
println!("{summary}");
// Maybe drop out if we finished migration
if let Some(until_migrated_to) = until_migrated_to {
if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling {
println!(
"Tenant shard {} is now on node {}",
tenant_shard_id, until_migrated_to
);
println!("Tenant shard {tenant_shard_id} is now on node {until_migrated_to}");
break;
}
}

View File

@@ -374,7 +374,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let request = Request::builder()
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
.method(method)
.header("Authorization", format!("Bearer {}", token))
.header("Authorization", format!("Bearer {token}"))
.body(Body::empty())
.unwrap();
let status = ServiceExt::ready(&mut app)

View File

@@ -71,7 +71,7 @@ impl Runtime {
debug!("thread panicked: {:?}", e);
let mut result = ctx.result.lock();
if result.0 == -1 {
*result = (256, format!("thread panicked: {:?}", e));
*result = (256, format!("thread panicked: {e:?}"));
}
});
}

View File

@@ -47,8 +47,8 @@ impl Debug for AnyMessage {
match self {
AnyMessage::None => write!(f, "None"),
AnyMessage::InternalConnect => write!(f, "InternalConnect"),
AnyMessage::Just32(v) => write!(f, "Just32({})", v),
AnyMessage::ReplCell(v) => write!(f, "ReplCell({:?})", v),
AnyMessage::Just32(v) => write!(f, "Just32({v})"),
AnyMessage::ReplCell(v) => write!(f, "ReplCell({v:?})"),
AnyMessage::Bytes(v) => write!(f, "Bytes({})", hex::encode(v)),
AnyMessage::LSN(v) => write!(f, "LSN({})", Lsn(*v)),
}

View File

@@ -582,14 +582,14 @@ pub fn attach_openapi_ui(
deepLinking: true,
showExtensions: true,
showCommonExtensions: true,
url: "{}",
url: "{spec_mount_path}",
}})
window.ui = ui;
}};
</script>
</body>
</html>
"#, spec_mount_path))).unwrap())
"#))).unwrap())
})
)
}
@@ -696,7 +696,7 @@ mod tests {
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
panic!("request service is not ready: {e:?}");
}
let mut req: Request<Body> = Request::default();
@@ -716,7 +716,7 @@ mod tests {
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
panic!("request service is not ready: {e:?}");
}
let req: Request<Body> = Request::default();

View File

@@ -86,7 +86,7 @@ impl ShmemHandle {
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {} too large", max_size);
panic!("max size {max_size} too large");
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
@@ -279,7 +279,7 @@ mod tests {
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {}", i);
assert_eq!(expected, b, "unexpected byte at offset {i}");
}
}

View File

@@ -76,6 +76,10 @@ pub struct PostHogConfig {
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
/// Refresh interval for the feature flag spec
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub refresh_interval: Option<Duration>,
}
/// `pageserver.toml`

View File

@@ -577,8 +577,7 @@ mod test {
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
"expect unknown field `unknown_field` error, got: {err}"
);
}

View File

@@ -334,8 +334,7 @@ impl KeySpace {
std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
assert!(
!overlap,
"Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
prev, range
"Attempt to merge ovelapping keyspaces: {prev:?} overlaps {range:?}"
);
}
@@ -1104,7 +1103,7 @@ mod tests {
// total range contains at least one shard-local page
let all_nonzero = fragments.iter().all(|f| f.0 > 0);
if !all_nonzero {
eprintln!("Found a zero-length fragment: {:?}", fragments);
eprintln!("Found a zero-length fragment: {fragments:?}");
}
assert!(all_nonzero);
} else {

View File

@@ -1183,7 +1183,7 @@ impl Display for ImageCompressionAlgorithm {
ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
ImageCompressionAlgorithm::Zstd { level } => {
if let Some(level) = level {
write!(f, "zstd({})", level)
write!(f, "zstd({level})")
} else {
write!(f, "zstd")
}
@@ -2012,8 +2012,7 @@ mod tests {
let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
"expect unknown field `unknown_field` error, got: {err}"
);
}

View File

@@ -939,7 +939,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackendReader<IO> {
FeMessage::CopyFail => Err(CopyStreamHandlerEnd::CopyFail),
FeMessage::Terminate => Err(CopyStreamHandlerEnd::Terminate),
_ => Err(CopyStreamHandlerEnd::from(ConnectionError::Protocol(
ProtocolError::Protocol(format!("unexpected message in COPY stream {:?}", msg)),
ProtocolError::Protocol(format!("unexpected message in COPY stream {msg:?}")),
))),
},
None => Err(CopyStreamHandlerEnd::EOF),

View File

@@ -61,7 +61,7 @@ async fn simple_select() {
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -137,7 +137,7 @@ async fn simple_select_ssl() {
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -223,7 +223,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: None }"
);
}
@@ -239,7 +239,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "[::1]:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Ipv6(::1), port: 123, password: None }"
);
}
@@ -252,7 +252,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: Some(REDACTED-STRING) }"
);
}

View File

@@ -114,7 +114,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
|e| WalDecodeError {
msg: format!("long header deserialization failed {}", e),
msg: format!("long header deserialization failed {e}"),
lsn: self.lsn,
},
)?;
@@ -130,7 +130,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let hdr =
XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("header deserialization failed {}", e),
msg: format!("header deserialization failed {e}"),
lsn: self.lsn,
}
})?;
@@ -155,7 +155,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {}", xl_tot_len),
msg: format!("invalid xl_tot_len {xl_tot_len}"),
lsn: self.lsn,
});
}
@@ -218,7 +218,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
msg: format!("xlog record deserialization failed {e}"),
lsn: self.lsn,
}
})?;

View File

@@ -1196,7 +1196,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
_ => {
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1209,7 +1209,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
_ => {
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1220,7 +1220,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_FPI => "XLOG FPI",
pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
_ => {
unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
unknown_str = format!("XLOG UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1228,7 +1228,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
rmid => {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
unknown_str = format!("UNKNOWN_RM_{rmid} INFO_0x{info:02x}");
&unknown_str
}
};

View File

@@ -34,7 +34,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
let cfg = Conf {
pg_version,
pg_distrib_dir: top_path.join("pg_install"),
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
datadir: top_path.join(format!("test_output/{test_name}-{PG_MAJORVERSION}")),
};
if cfg.datadir.exists() {
fs::remove_dir_all(&cfg.datadir).unwrap();

View File

@@ -32,15 +32,15 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Spawn(e) => write!(f, "Error spawning command: {:?}", e),
Error::Spawn(e) => write!(f, "Error spawning command: {e:?}"),
Error::Failed { status, stderr } => write!(
f,
"Command failed with status {:?}: {}",
status,
String::from_utf8_lossy(stderr)
),
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {:?}", e),
Error::Other(e) => write!(f, "Error: {:?}", e),
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {e:?}"),
Error::Other(e) => write!(f, "Error: {e:?}"),
}
}
}

View File

@@ -1,4 +1,3 @@
use serde::ser::SerializeTuple;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::fmt::{Display, Formatter};
@@ -79,12 +78,12 @@ impl PgMajorVersion {
///
/// The PG_VERSION file is used to determine the PostgreSQL version that currently
/// owns the data in a PostgreSQL data directory.
pub fn versionfile_string(&self) -> String {
pub fn versionfile_string(&self) -> &'static str {
match self {
PgMajorVersion::PG17 => "17\x0A".to_string(),
PgMajorVersion::PG16 => "16\x0A".to_string(),
PgMajorVersion::PG15 => "15".to_string(),
PgMajorVersion::PG14 => "14".to_string(),
PgMajorVersion::PG14 => "14",
PgMajorVersion::PG15 => "15",
PgMajorVersion::PG16 => "16\x0A",
PgMajorVersion::PG17 => "17\x0A",
}
}
@@ -94,15 +93,16 @@ impl PgMajorVersion {
/// implementation.
pub fn v_str(&self) -> String {
match self {
PgMajorVersion::PG17 => "v17".to_string(),
PgMajorVersion::PG16 => "v16".to_string(),
PgMajorVersion::PG15 => "v15".to_string(),
PgMajorVersion::PG14 => "v14".to_string(),
PgMajorVersion::PG14 => "v14",
PgMajorVersion::PG15 => "v15",
PgMajorVersion::PG16 => "v16",
PgMajorVersion::PG17 => "v17",
}
.to_string()
}
/// All currently supported major versions of PostgreSQL.
pub const ALL: [PgMajorVersion; 4] = [
pub const ALL: &'static [PgMajorVersion] = &[
PgMajorVersion::PG14,
PgMajorVersion::PG15,
PgMajorVersion::PG16,
@@ -112,20 +112,12 @@ impl PgMajorVersion {
impl Display for PgMajorVersion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
PgMajorVersion::PG14 => {
write!(f, "PgMajorVersion::PG14")
}
PgMajorVersion::PG15 => {
write!(f, "PgMajorVersion::PG15")
}
PgMajorVersion::PG16 => {
write!(f, "PgMajorVersion::PG16")
}
PgMajorVersion::PG17 => {
write!(f, "PgMajorVersion::PG17")
}
}
f.write_str(match self {
PgMajorVersion::PG14 => "PgMajorVersion::PG14",
PgMajorVersion::PG15 => "PgMajorVersion::PG15",
PgMajorVersion::PG16 => "PgMajorVersion::PG16",
PgMajorVersion::PG17 => "PgMajorVersion::PG17",
})
}
}
@@ -135,8 +127,7 @@ pub struct InvalidPgVersion(u32);
impl Display for InvalidPgVersion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.serialize_tuple_struct("InvalidPgVersion", 1)?
.serialize_element(&self.0)
write!(f, "InvalidPgVersion({})", self.0)
}
}
@@ -165,8 +156,7 @@ pub struct PgMajorVersionParseError(String);
impl Display for PgMajorVersionParseError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.serialize_tuple_struct("PgMajorVersionParseError", 1)?
.serialize_element(&self.0)
write!(f, "PgMajorVersionParseError({})", self.0)
}
}
@@ -174,12 +164,12 @@ impl FromStr for PgMajorVersion {
type Err = PgMajorVersionParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"14" => Ok(PgMajorVersion::PG14),
"15" => Ok(PgMajorVersion::PG15),
"16" => Ok(PgMajorVersion::PG16),
"17" => Ok(PgMajorVersion::PG17),
_ => Err(PgMajorVersionParseError(s.to_string())),
}
Ok(match s {
"14" => PgMajorVersion::PG14,
"15" => PgMajorVersion::PG15,
"16" => PgMajorVersion::PG16,
"17" => PgMajorVersion::PG17,
_ => return Err(PgMajorVersionParseError(s.to_string())),
})
}
}

View File

@@ -36,7 +36,10 @@ impl FeatureResolverBackgroundLoop {
// Main loop of updating the feature flags.
handle.spawn(
async move {
tracing::info!("Starting PostHog feature resolver");
tracing::info!(
"Starting PostHog feature resolver with refresh period: {:?}",
refresh_period
);
let mut ticker = tokio::time::interval(refresh_period);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {

View File

@@ -168,15 +168,13 @@ impl FeatureStore {
let PostHogFlagFilterPropertyValue::String(provided) = provided else {
// Left should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a string: {:?}",
provided
"The left side of the condition is not a string: {provided:?}"
)));
};
let PostHogFlagFilterPropertyValue::List(requested) = requested else {
// Right should be a list of string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a list: {:?}",
requested
"The right side of the condition is not a list: {requested:?}"
)));
};
Ok(requested.contains(provided))
@@ -185,14 +183,12 @@ impl FeatureStore {
let PostHogFlagFilterPropertyValue::String(requested) = requested else {
// Right should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a string: {:?}",
requested
"The right side of the condition is not a string: {requested:?}"
)));
};
let Ok(requested) = requested.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the right side of the condition as a number: {:?}",
requested
"Can not parse the right side of the condition as a number: {requested:?}"
)));
};
// Left can either be a number or a string
@@ -201,16 +197,14 @@ impl FeatureStore {
PostHogFlagFilterPropertyValue::String(provided) => {
let Ok(provided) = provided.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the left side of the condition as a number: {:?}",
provided
"Can not parse the left side of the condition as a number: {provided:?}"
)));
};
provided
}
_ => {
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a number or a string: {:?}",
provided
"The left side of the condition is not a number or a string: {provided:?}"
)));
}
};
@@ -218,14 +212,12 @@ impl FeatureStore {
"lt" => Ok(provided < requested),
"gt" => Ok(provided > requested),
op => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
op
"Unsupported operator: {op}"
))),
}
}
_ => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
operator
"Unsupported operator: {operator}"
))),
}
}
@@ -373,8 +365,7 @@ impl FeatureStore {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
"The feature flag is not active: {flag_key}"
)));
}
let Some(ref multivariate) = flag_config.filters.multivariate else {
@@ -401,8 +392,7 @@ impl FeatureStore {
// This should not happen because the rollout percentage always adds up to 100, but just in case that PostHog
// returned invalid spec, we return an error.
return Err(PostHogEvaluationError::Internal(format!(
"Rollout percentage does not add up to 100: {}",
flag_key
"Rollout percentage does not add up to 100: {flag_key}"
)));
}
GroupEvaluationResult::Unmatched => continue,
@@ -413,8 +403,7 @@ impl FeatureStore {
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}
@@ -440,8 +429,7 @@ impl FeatureStore {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
"The feature flag is not active: {flag_key}"
)));
}
if flag_config.filters.multivariate.is_some() {
@@ -456,8 +444,7 @@ impl FeatureStore {
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
GroupEvaluationResult::MatchedAndOverride(_) => {
return Err(PostHogEvaluationError::Internal(format!(
"Boolean flag cannot have overrides: {}",
flag_key
"Boolean flag cannot have overrides: {flag_key}"
)));
}
GroupEvaluationResult::MatchedAndEvaluate => {
@@ -471,8 +458,7 @@ impl FeatureStore {
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}
@@ -483,8 +469,7 @@ impl FeatureStore {
Ok(flag_config.filters.multivariate.is_none())
} else {
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}

View File

@@ -198,7 +198,7 @@ impl fmt::Display for CancelKeyData {
// This format is more compact and might work better for logs.
f.debug_tuple("CancelKeyData")
.field(&format_args!("{:x}", id))
.field(&format_args!("{id:x}"))
.finish()
}
}
@@ -291,8 +291,7 @@ impl FeMessage {
let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
if len < 4 {
return Err(ProtocolError::Protocol(format!(
"invalid message length {}",
len
"invalid message length {len}"
)));
}
@@ -367,8 +366,7 @@ impl FeStartupPacket {
#[allow(clippy::manual_range_contains)]
if len < 8 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(ProtocolError::Protocol(format!(
"invalid startup packet message length {}",
len
"invalid startup packet message length {len}"
)));
}

View File

@@ -308,7 +308,7 @@ impl ScramSha256 {
let verifier = match parsed {
ServerFinalMessage::Error(e) => {
return Err(io::Error::other(format!("SCRAM error: {}", e)));
return Err(io::Error::other(format!("SCRAM error: {e}")));
}
ServerFinalMessage::Verifier(verifier) => verifier,
};
@@ -343,10 +343,8 @@ impl<'a> Parser<'a> {
match self.it.next() {
Some((_, c)) if c == target => Ok(()),
Some((i, c)) => {
let m = format!(
"unexpected character at byte {}: expected `{}` but got `{}",
i, target, c
);
let m =
format!("unexpected character at byte {i}: expected `{target}` but got `{c}");
Err(io::Error::new(io::ErrorKind::InvalidInput, m))
}
None => Err(io::Error::new(
@@ -412,7 +410,7 @@ impl<'a> Parser<'a> {
match self.it.peek() {
Some(&(i, _)) => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unexpected trailing data at byte {}", i),
format!("unexpected trailing data at byte {i}"),
)),
None => Ok(()),
}

View File

@@ -211,7 +211,7 @@ impl Message {
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown authentication tag `{}`", tag),
format!("unknown authentication tag `{tag}`"),
));
}
},
@@ -238,7 +238,7 @@ impl Message {
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag `{}`", tag),
format!("unknown message tag `{tag}`"),
));
}
};

View File

@@ -46,7 +46,7 @@ impl fmt::Display for Type {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.schema() {
"public" | "pg_catalog" => {}
schema => write!(fmt, "{}.", schema)?,
schema => write!(fmt, "{schema}.")?,
}
fmt.write_str(self.name())
}

View File

@@ -12,7 +12,9 @@ use tokio::net::TcpStream;
use crate::connect::connect;
use crate::connect_raw::{RawConnection, connect_raw};
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{MakeTlsConnect, TlsConnect, TlsStream};
use crate::{Client, Connection, Error};
/// TLS configuration.
@@ -238,7 +240,7 @@ impl Config {
connect(tls, self).await
}
pub async fn connect_raw<S, T>(
pub async fn tls_and_authenticate<S, T>(
&self,
stream: S,
tls: T,
@@ -247,7 +249,19 @@ impl Config {
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
connect_raw(stream, tls, self).await
let stream = connect_tls(stream, self.ssl_mode, tls).await?;
connect_raw(stream, self).await
}
pub async fn authenticate<S, T>(
&self,
stream: MaybeTlsStream<S, T>,
) -> Result<RawConnection<S, T>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsStream + Unpin,
{
connect_raw(stream, self).await
}
}

View File

@@ -1,14 +1,16 @@
use std::net::IpAddr;
use postgres_protocol2::message::backend::Message;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use crate::client::SocketConfig;
use crate::codec::BackendMessage;
use crate::config::Host;
use crate::config::{Host, SslMode};
use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::connect_tls::connect_tls;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
@@ -44,13 +46,8 @@ where
T: TlsConnect<TcpStream>,
{
let socket = connect_socket(host_addr, host, port, config.connect_timeout).await?;
let RawConnection {
stream,
parameters,
delayed_notice,
process_id,
secret_key,
} = connect_raw(socket, tls, config).await?;
let stream = connect_tls(socket, config.ssl_mode, tls).await?;
let raw = connect_raw(stream, config).await?;
let socket_config = SocketConfig {
host_addr,
@@ -59,24 +56,46 @@ where
connect_timeout: config.connect_timeout,
};
let (client_tx, conn_rx) = mpsc::unbounded_channel();
let (conn_tx, client_rx) = mpsc::channel(4);
let client = Client::new(
client_tx,
client_rx,
socket_config,
config.ssl_mode,
process_id,
secret_key,
);
// delayed notices are always sent as "Async" messages.
let delayed = delayed_notice
.into_iter()
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
.collect();
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
Ok((client, connection))
Ok(raw.into_managed_conn(socket_config, config.ssl_mode))
}
impl<S, T> RawConnection<S, T>
where
S: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin,
{
pub fn into_managed_conn(
self,
socket_config: SocketConfig,
ssl_mode: SslMode,
) -> (Client, Connection<S, T>) {
let RawConnection {
stream,
parameters,
delayed_notice,
process_id,
secret_key,
} = self;
let (client_tx, conn_rx) = mpsc::unbounded_channel();
let (conn_tx, client_rx) = mpsc::channel(4);
let client = Client::new(
client_tx,
client_rx,
socket_config,
ssl_mode,
process_id,
secret_key,
);
// delayed notices are always sent as "Async" messages.
let delayed = delayed_notice
.into_iter()
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
.collect();
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
(client, connection)
}
}

View File

@@ -16,9 +16,8 @@ use tokio_util::codec::Framed;
use crate::Error;
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{self, AuthKeys, Config};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{TlsConnect, TlsStream};
use crate::tls::TlsStream;
pub struct StartupStream<S, T> {
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
@@ -87,16 +86,13 @@ pub struct RawConnection<S, T> {
}
pub async fn connect_raw<S, T>(
stream: S,
tls: T,
stream: MaybeTlsStream<S, T>,
config: &Config,
) -> Result<RawConnection<S, T::Stream>, Error>
) -> Result<RawConnection<S, T>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
T: TlsStream + Unpin,
{
let stream = connect_tls(stream, config.ssl_mode, tls).await?;
let mut stream = StartupStream {
inner: Framed::new(stream, PostgresCodec),
buf: BackendMessages::empty(),

View File

@@ -332,10 +332,10 @@ impl fmt::Display for DbError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}: {}", self.severity, self.message)?;
if let Some(detail) = &self.detail {
write!(fmt, "\nDETAIL: {}", detail)?;
write!(fmt, "\nDETAIL: {detail}")?;
}
if let Some(hint) = &self.hint {
write!(fmt, "\nHINT: {}", hint)?;
write!(fmt, "\nHINT: {hint}")?;
}
Ok(())
}
@@ -398,9 +398,9 @@ impl fmt::Display for Error {
Kind::Io => fmt.write_str("error communicating with the server")?,
Kind::UnexpectedMessage => fmt.write_str("unexpected message from server")?,
Kind::Tls => fmt.write_str("error performing TLS handshake")?,
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {}", idx)?,
Kind::FromSql(idx) => write!(fmt, "error deserializing column {}", idx)?,
Kind::Column(column) => write!(fmt, "invalid column `{}`", column)?,
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {idx}")?,
Kind::FromSql(idx) => write!(fmt, "error deserializing column {idx}")?,
Kind::Column(column) => write!(fmt, "invalid column `{column}`")?,
Kind::Closed => fmt.write_str("connection closed")?,
Kind::Db => fmt.write_str("db error")?,
Kind::Parse => fmt.write_str("error parsing response from server")?,
@@ -411,7 +411,7 @@ impl fmt::Display for Error {
Kind::Timeout => fmt.write_str("timeout waiting for server")?,
};
if let Some(ref cause) = self.0.cause {
write!(fmt, ": {}", cause)?;
write!(fmt, ": {cause}")?;
}
Ok(())
}

View File

@@ -156,7 +156,7 @@ impl Row {
{
match self.get_inner(&idx) {
Ok(ok) => ok,
Err(err) => panic!("error retrieving column {}: {}", idx, err),
Err(err) => panic!("error retrieving column {idx}: {err}"),
}
}
@@ -274,7 +274,7 @@ impl SimpleQueryRow {
{
match self.get_inner(&idx) {
Ok(ok) => ok,
Err(err) => panic!("error retrieving column {}: {}", idx, err),
Err(err) => panic!("error retrieving column {idx}: {err}"),
}
}

View File

@@ -400,7 +400,7 @@ impl RemoteStorage for LocalFs {
key
};
let relative_key = format!("{}", relative_key);
let relative_key = format!("{relative_key}");
if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
let first_part = relative_key
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
@@ -594,13 +594,9 @@ impl RemoteStorage for LocalFs {
let from_path = from.with_base(&self.storage_root);
let to_path = to.with_base(&self.storage_root);
create_target_directory(&to_path).await?;
fs::copy(&from_path, &to_path).await.with_context(|| {
format!(
"Failed to copy file from '{from_path}' to '{to_path}'",
from_path = from_path,
to_path = to_path
)
})?;
fs::copy(&from_path, &to_path)
.await
.with_context(|| format!("Failed to copy file from '{from_path}' to '{to_path}'"))?;
Ok(())
}
@@ -1183,7 +1179,7 @@ mod fs_tests {
.write(true)
.create_new(true)
.open(path)?;
write!(file_for_writing, "{}", contents)?;
write!(file_for_writing, "{contents}")?;
drop(file_for_writing);
let file_size = path.metadata()?.len() as usize;
Ok((

View File

@@ -193,10 +193,10 @@ mod tests {
})
.unwrap();
println!("members: {}", members);
println!("members: {members}");
let j = serde_json::to_string(&members).expect("failed to serialize");
println!("members json: {}", j);
println!("members json: {j}");
assert_eq!(
j,
r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#

View File

@@ -41,7 +41,7 @@ pub fn report_compact_sources<E: std::error::Error>(e: &E) -> impl std::fmt::Dis
// why is E a generic parameter here? hope that rustc will see through a default
// Error::source implementation and leave the following out if there cannot be any
// sources:
Sources(self.0.source()).try_for_each(|src| write!(f, ": {}", src))
Sources(self.0.source()).try_for_each(|src| write!(f, ": {src}"))
}
}

View File

@@ -135,7 +135,7 @@ impl Debug for Generation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Valid(v) => {
write!(f, "{:08x}", v)
write!(f, "{v:08x}")
}
Self::None => {
write!(f, "<none>")

View File

@@ -280,7 +280,7 @@ impl TryFrom<Option<&str>> for TimelineId {
value
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| format!("Could not parse timeline id from {:?}", value))
.with_context(|| format!("Could not parse timeline id from {value:?}"))
}
}

View File

@@ -89,7 +89,7 @@ pub fn wal_stream_connection_config(
.set_password(args.auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = args.availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
connstr = connstr.extend_options([format!("availability_zone={availability_zone}")]);
}
Ok(connstr)

View File

@@ -196,7 +196,7 @@ impl std::fmt::Display for TenantShardId {
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
write!(f, "{self}")
}
}
@@ -284,7 +284,7 @@ impl std::fmt::Display for ShardIndex {
impl std::fmt::Debug for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
write!(f, "{self}")
}
}

View File

@@ -29,7 +29,7 @@ impl ShutdownSignals {
SIGINT => Signal::Interrupt,
SIGTERM => Signal::Terminate,
SIGQUIT => Signal::Quit,
other => panic!("unknown signal: {}", other),
other => panic!("unknown signal: {other}"),
};
handler(signal)?;

View File

@@ -90,8 +90,7 @@ impl Dispatcher {
Err(e) => {
sink.send(Message::Text(Utf8Bytes::from(
serde_json::to_string(&ProtocolResponse::Error(format!(
"Received protocol version range {} which does not overlap with {}",
agent_range, monitor_range
"Received protocol version range {agent_range} which does not overlap with {monitor_range}"
)))
.unwrap(),
)))

View File

@@ -285,7 +285,7 @@ impl FileCacheState {
// why we're constructing the query here.
self.client
.query(
&format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
&format!("ALTER SYSTEM SET neon.file_cache_size_limit = {num_mb};"),
&[],
)
.await

View File

@@ -64,7 +64,7 @@ async fn download_bench_data(
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into()?;
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent)?;
eprintln!("Downloading benchmark data to {:?}", temp_dir);
eprintln!("Downloading benchmark data to {temp_dir:?}");
let listing = client
.list(None, ListingMode::NoDelimiter, None, cancel)
@@ -120,7 +120,7 @@ struct BenchmarkMetadata {
}
async fn load_bench_data(path: &Utf8Path, input_size: usize) -> anyhow::Result<BenchmarkData> {
eprintln!("Loading benchmark data from {:?}", path);
eprintln!("Loading benchmark data from {path:?}");
let mut entries = tokio::fs::read_dir(path).await?;
let mut ordered_segment_paths = Vec::new();

View File

@@ -6,6 +6,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// the build then. Anyway, per cargo docs build script shouldn't output to
// anywhere but $OUT_DIR.
tonic_build::compile_protos("proto/interpreted_wal.proto")
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
.unwrap_or_else(|e| panic!("failed to compile protos {e:?}"));
Ok(())
}

View File

@@ -128,6 +128,6 @@ pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeErr
will_init,
describe_postgres_wal_record(rec)?
)),
_ => Ok(format!("{:?}", rec)),
_ => Ok(format!("{rec:?}")),
}
}

View File

@@ -376,7 +376,7 @@ impl Level {
FATAL => Level::Fatal,
PANIC => Level::Panic,
WPEVENT => Level::WPEvent,
_ => panic!("unknown log level {}", elevel),
_ => panic!("unknown log level {elevel}"),
}
}
}
@@ -446,7 +446,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
impl std::fmt::Display for Level {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}

View File

@@ -380,7 +380,7 @@ mod tests {
}
fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
println!("conn_send_query: {}", query);
println!("conn_send_query: {query}");
true
}
@@ -399,13 +399,13 @@ mod tests {
) -> crate::bindings::PGAsyncReadResult {
println!("conn_async_read");
let reply = self.next_safekeeper_reply();
println!("conn_async_read result: {:?}", reply);
println!("conn_async_read result: {reply:?}");
vec.extend_from_slice(reply);
crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
}
fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
println!("conn_blocking_write: {:?}", buf);
println!("conn_blocking_write: {buf:?}");
self.check_walproposer_msg(buf);
true
}
@@ -456,10 +456,7 @@ mod tests {
timeout_millis: i64,
) -> super::WaitResult {
let data = self.wait_events.get();
println!(
"wait_event_set, timeout_millis={}, res={:?}",
timeout_millis, data
);
println!("wait_event_set, timeout_millis={timeout_millis}, res={data:?}");
super::WaitResult::Network(data.sk, data.event_mask)
}
@@ -475,7 +472,7 @@ mod tests {
}
fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
println!("wp_log[{}] {}", level, msg);
println!("wp_log[{level}] {msg}");
}
fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {

View File

@@ -12,6 +12,9 @@ testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "
fuzz-read-path = ["testing"]
# Enables benchmarking only APIs
benchmarking = []
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
@@ -127,6 +130,7 @@ harness = false
[[bench]]
name = "bench_ingest"
harness = false
required-features = ["benchmarking"]
[[bench]]
name = "upload_queue"

View File

@@ -1,22 +1,29 @@
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{Criterion, criterion_group, criterion_main};
use futures::stream::FuturesUnordered;
use pageserver::config::PageServerConf;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::keyspace::KeySpace;
use pageserver::l0_flush::{L0FlushConfig, L0FlushGlobalState};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::storage_layer::InMemoryLayer;
use pageserver::tenant::storage_layer::IoConcurrency;
use pageserver::tenant::storage_layer::{InMemoryLayer, ValuesReconstructState};
use pageserver::{page_cache, virtual_file};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use strum::IntoEnumIterator;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::sync::gate::Gate;
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::SerializedValueBatch;
@@ -30,7 +37,7 @@ fn murmurhash32(mut h: u32) -> u32 {
h
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
enum KeyLayout {
/// Sequential unique keys
Sequential,
@@ -40,19 +47,30 @@ enum KeyLayout {
RandomReuse(u32),
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
enum WriteDelta {
Yes,
No,
}
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
enum ConcurrentReads {
Yes,
No,
}
async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
) -> anyhow::Result<()> {
if concurrent_reads == ConcurrentReads::Yes {
assert_eq!(key_layout, KeyLayout::Sequential);
}
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);
@@ -68,16 +86,18 @@ async fn ingest(
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let layer = InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
lsn,
&gate,
&cancel,
&ctx,
)
.await?;
let layer = Arc::new(
InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
lsn,
&gate,
&cancel,
&ctx,
)
.await?,
);
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
let data_ser_size = data.serialized_size().unwrap() as usize;
@@ -86,6 +106,61 @@ async fn ingest(
pageserver::context::DownloadBehavior::Download,
);
const READ_BATCH_SIZE: u32 = 32;
let (tx, mut rx) = tokio::sync::watch::channel::<Option<Key>>(None);
let reader_cancel = CancellationToken::new();
let reader_handle = if concurrent_reads == ConcurrentReads::Yes {
Some(tokio::task::spawn({
let cancel = reader_cancel.clone();
let layer = layer.clone();
let ctx = ctx.attached_child();
async move {
let gate = Gate::default();
let gate_guard = gate.enter().unwrap();
let io_concurrency = IoConcurrency::spawn_from_conf(
GetVectoredConcurrentIo::SidecarTask,
gate_guard,
);
rx.wait_for(|key| key.is_some()).await.unwrap();
while !cancel.is_cancelled() {
let key = match *rx.borrow() {
Some(some) => some,
None => unreachable!(),
};
let mut start_key = key;
start_key.field6 = key.field6.saturating_sub(READ_BATCH_SIZE);
let key_range = start_key..key.next();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
layer
.get_values_reconstruct_data(
KeySpace::single(key_range),
Lsn(1)..Lsn(u64::MAX),
&mut reconstruct_state,
&ctx,
)
.await
.unwrap();
let mut collect_futs = std::mem::take(&mut reconstruct_state.keys)
.into_values()
.map(|state| state.sink_pending_ios())
.collect::<FuturesUnordered<_>>();
while collect_futs.next().await.is_some() {}
}
drop(io_concurrency);
gate.close().await;
}
}))
} else {
None
};
const BATCH_SIZE: usize = 16;
let mut batch = Vec::new();
@@ -113,19 +188,27 @@ async fn ingest(
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
if batch.len() >= BATCH_SIZE {
let last_key = Key::from_compact(batch.last().unwrap().0);
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedValueBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
tx.send(Some(last_key)).unwrap();
}
}
if !batch.is_empty() {
let last_key = Key::from_compact(batch.last().unwrap().0);
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedValueBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
tx.send(Some(last_key)).unwrap();
}
layer.freeze(lsn + 1).await;
if matches!(write_delta, WriteDelta::Yes) {
if write_delta == WriteDelta::Yes {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
@@ -136,6 +219,11 @@ async fn ingest(
tokio::fs::remove_file(path).await?;
}
reader_cancel.cancel();
if let Some(handle) = reader_handle {
handle.await.unwrap();
}
Ok(())
}
@@ -147,6 +235,7 @@ fn ingest_main(
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
) {
pageserver::virtual_file::set_io_mode(io_mode);
@@ -156,7 +245,15 @@ fn ingest_main(
.unwrap();
runtime.block_on(async move {
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
let r = ingest(
conf,
put_size,
put_count,
key_layout,
write_delta,
concurrent_reads,
)
.await;
if let Err(e) = r {
panic!("{e:?}");
}
@@ -195,6 +292,7 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
}
#[derive(Clone)]
struct HandPickedParameters {
@@ -245,7 +343,7 @@ fn criterion_benchmark(c: &mut Criterion) {
];
let exploded_parameters = {
let mut out = Vec::new();
for io_mode in IoMode::iter() {
for concurrent_reads in [ConcurrentReads::Yes, ConcurrentReads::No] {
for param in expect.clone() {
let HandPickedParameters {
volume_mib,
@@ -253,12 +351,18 @@ fn criterion_benchmark(c: &mut Criterion) {
key_layout,
write_delta,
} = param;
if key_layout != KeyLayout::Sequential && concurrent_reads == ConcurrentReads::Yes {
continue;
}
out.push(ExplodedParameters {
io_mode,
io_mode: IoMode::DirectRw,
volume_mib,
key_size,
key_layout,
write_delta,
concurrent_reads,
});
}
}
@@ -272,9 +376,10 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size,
key_layout,
write_delta,
concurrent_reads,
} = self;
format!(
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?} concurrent_reads={concurrent_reads:?}"
)
}
}
@@ -287,12 +392,23 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size,
key_layout,
write_delta,
concurrent_reads,
} = params;
let put_count = volume_mib * 1024 * 1024 / key_size;
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
group.sample_size(10);
group.bench_function(id, |b| {
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
b.iter(|| {
ingest_main(
conf,
io_mode,
key_size,
put_count,
key_layout,
write_delta,
concurrent_reads,
)
})
});
}
}

View File

@@ -509,11 +509,11 @@ impl Client {
.expect("Cannot build URL");
path.query_pairs_mut()
.append_pair("recurse", &format!("{}", recurse));
.append_pair("recurse", &format!("{recurse}"));
if let Some(concurrency) = concurrency {
path.query_pairs_mut()
.append_pair("concurrency", &format!("{}", concurrency));
.append_pair("concurrency", &format!("{concurrency}"));
}
self.request(Method::POST, path, ()).await.map(|_| ())

View File

@@ -152,7 +152,7 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
let key_diff = key_end - key_start;
if key_start >= key_end {
panic!("Invalid key range {}-{}", key_start, key_end);
panic!("Invalid key range {key_start}-{key_end}");
}
let lsn_start = lsn_map.map(f.lsn_range.start);
@@ -212,12 +212,12 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
)?;
writeln!(svg, "</line>")?;
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
Ordering::Greater => panic!("Invalid lsn range {lsn_start}-{lsn_end}"),
}
files_seen.insert(f);
}
writeln!(svg, "{}", EndSvg)?;
writeln!(svg, "{EndSvg}")?;
let mut layer_events_str = String::new();
let mut first = true;

View File

@@ -228,7 +228,7 @@ pub fn main() -> Result<()> {
let lsn_max = lsn_map.len();
if key_start >= key_end {
panic!("Invalid key range {}-{}", key_start, key_end);
panic!("Invalid key range {key_start}-{key_end}");
}
let lsn_start = *lsn_map.get(&lsnr.start).unwrap();
@@ -250,7 +250,7 @@ pub fn main() -> Result<()> {
ymargin = 0.05;
fill = Fill::Color(rgb(0, 0, 0));
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
Ordering::Greater => panic!("Invalid lsn range {lsn_start}-{lsn_end}"),
}
println!(
@@ -287,10 +287,10 @@ pub fn main() -> Result<()> {
);
}
println!("{}", EndSvg);
println!("{EndSvg}");
eprintln!("num_images: {}", num_images);
eprintln!("num_deltas: {}", num_deltas);
eprintln!("num_images: {num_images}");
eprintln!("num_deltas: {num_deltas}");
Ok(())
}

View File

@@ -372,7 +372,7 @@ impl<const N: usize> std::fmt::Debug for RelTagish<N> {
f.write_char('/')?;
}
first = false;
write!(f, "{}", x)
write!(f, "{x}")
})
}
}

View File

@@ -224,8 +224,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
}
}
println!(
"Total delta layers {} image layers {} excess layers {}",
total_delta_layers, total_image_layers, total_excess_layers
"Total delta layers {total_delta_layers} image layers {total_image_layers} excess layers {total_excess_layers}"
);
Ok(())
}

View File

@@ -131,7 +131,7 @@ impl Client {
let domain_stream = response_stream.map(|chunk_res| {
chunk_res.and_then(|proto_chunk| {
proto_chunk.try_into().map_err(|e| {
tonic::Status::internal(format!("Failed to convert response chunk: {}", e))
tonic::Status::internal(format!("Failed to convert response chunk: {e}"))
})
})
});

View File

@@ -62,7 +62,7 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
let timeline_id = timeline.timeline_id;
println!("operating on timeline {}", timeline);
println!("operating on timeline {timeline}");
mgmt_api_client
.set_tenant_config(&TenantConfigRequest {
@@ -75,8 +75,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let items = (0..100)
.map(|id| {
(
format!("pg_logical/mappings/{:03}.{:03}", batch, id),
format!("{:08}", id),
format!("pg_logical/mappings/{batch:03}.{id:03}"),
format!("{id:08}"),
)
})
.collect::<HashMap<_, _>>();

View File

@@ -667,7 +667,7 @@ where
}
// Append dir path for each database
let path = format!("base/{}", dbnode);
let path = format!("base/{dbnode}");
let header = new_tar_header_dir(&path)?;
self.ar
.append(&header, io::empty())
@@ -675,7 +675,7 @@ where
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
let dst_path = format!("base/{dbnode}/PG_VERSION");
let pg_version_str = self.timeline.pg_version.versionfile_string();
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
@@ -684,7 +684,7 @@ where
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let relmap_path = format!("base/{dbnode}/pg_filenode.map");
let header = new_tar_header(&relmap_path, img.len() as u64)?;
self.ar
.append(&header, &img[..])
@@ -709,9 +709,9 @@ where
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = if self.timeline.pg_version < PgMajorVersion::PG17 {
format!("pg_twophase/{:>08X}", xid)
format!("pg_twophase/{xid:>08X}")
} else {
format!("pg_twophase/{:>016X}", xid)
format!("pg_twophase/{xid:>016X}")
};
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar
@@ -763,7 +763,7 @@ where
//send wal segment
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let wal_file_path = format!("pg_wal/{wal_file_name}");
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg = postgres_ffi::generate_wal_segment(

View File

@@ -583,7 +583,7 @@ fn start_pageserver(
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
feature_resolver,
feature_resolver: feature_resolver.clone(),
},
shutdown_pageserver.clone(),
);
@@ -715,6 +715,7 @@ fn start_pageserver(
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
feature_resolver,
)
.context("Failed to initialize router state")?,
);

View File

@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use pageserver_api::config::NodeMetadata;
use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
@@ -12,10 +13,13 @@ use utils::id::TenantId;
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
#[derive(Clone)]
pub struct FeatureResolver {
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
}
impl FeatureResolver {
@@ -23,6 +27,7 @@ impl FeatureResolver {
Self {
inner: None,
internal_properties: None,
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
}
}
@@ -139,18 +144,23 @@ impl FeatureResolver {
}
tenants
};
// TODO: make refresh period configurable
inner
.clone()
.spawn(handle, Duration::from_secs(60), fake_tenants);
inner.clone().spawn(
handle,
posthog_config
.refresh_interval
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
fake_tenants,
);
Ok(FeatureResolver {
inner: Some(inner),
internal_properties: Some(internal_properties),
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
})
} else {
Ok(FeatureResolver {
inner: None,
internal_properties: None,
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
})
}
}
@@ -190,6 +200,11 @@ impl FeatureResolver {
flag_key: &str,
tenant_id: TenantId,
) -> Result<String, PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
return Ok(value.clone());
}
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_multivariate(
flag_key,
@@ -228,6 +243,15 @@ impl FeatureResolver {
flag_key: &str,
tenant_id: TenantId,
) -> Result<(), PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
return if value == "true" {
Ok(())
} else {
Err(PostHogEvaluationError::NoConditionGroupMatched)
};
}
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_boolean(
flag_key,
@@ -259,8 +283,22 @@ impl FeatureResolver {
inner.feature_store().is_feature_flag_boolean(flag_key)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled".to_string(),
"PostHog integration is not enabled, cannot auto-determine the flag type"
.to_string(),
))
}
}
/// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
/// from a single thread so it won't race.
pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
if let Some(value) = value {
force_overrides.insert(flag_key.to_string(), value.to_string());
} else {
force_overrides.remove(flag_key);
}
self.force_overrides_for_testing
.store(Arc::new(force_overrides));
}
}

View File

@@ -60,6 +60,7 @@ use crate::config::PageServerConf;
use crate::context;
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::deletion_queue::DeletionQueueClient;
use crate::feature_resolver::FeatureResolver;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationConf;
@@ -108,6 +109,7 @@ pub struct State {
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
latest_utilization: tokio::sync::Mutex<Option<(std::time::Instant, bytes::Bytes)>>,
feature_resolver: FeatureResolver,
}
impl State {
@@ -121,6 +123,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
feature_resolver: FeatureResolver,
) -> anyhow::Result<Self> {
let allowlist_routes = &[
"/v1/status",
@@ -141,6 +144,7 @@ impl State {
deletion_queue_client,
secondary_controller,
latest_utilization: Default::default(),
feature_resolver,
})
}
}
@@ -284,11 +288,11 @@ impl From<GetActiveTenantError> for ApiError {
GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
ApiError::ShuttingDown
}
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{e}")),
GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
GetActiveTenantError::NotFound(gte) => gte.into(),
GetActiveTenantError::WaitForActiveTimeout { .. } => {
ApiError::ResourceUnavailable(format!("{}", e).into())
ApiError::ResourceUnavailable(format!("{e}").into())
}
GetActiveTenantError::SwitchedTenant => {
// in our HTTP handlers, this error doesn't happen
@@ -1012,7 +1016,7 @@ async fn get_lsn_by_timestamp_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = must_get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(&timestamp_raw)
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.with_context(|| format!("Invalid time: {timestamp_raw:?}"))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
@@ -1107,7 +1111,7 @@ async fn get_timestamp_of_lsn_handler(
json_response(StatusCode::OK, time)
}
None => Err(ApiError::PreconditionFailed(
format!("Timestamp for lsn {} not found", lsn).into(),
format!("Timestamp for lsn {lsn} not found").into(),
)),
}
}
@@ -2418,7 +2422,7 @@ async fn timeline_offload_handler(
}
if let (false, reason) = timeline.can_offload() {
return Err(ApiError::PreconditionFailed(
format!("Timeline::can_offload() check failed: {}", reason) .into(),
format!("Timeline::can_offload() check failed: {reason}") .into(),
));
}
offload_timeline(&tenant, &timeline)
@@ -3676,8 +3680,8 @@ async fn tenant_evaluate_feature_flag(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let flag: String = must_parse_query_param(&request, "flag")?;
let as_type: String = must_parse_query_param(&request, "as")?;
let flag: String = parse_request_param(&request, "flag_key")?;
let as_type: Option<String> = parse_query_param(&request, "as")?;
let state = get_state(&request);
@@ -3686,11 +3690,11 @@ async fn tenant_evaluate_feature_flag(
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
if as_type == "boolean" {
if as_type.as_deref() == Some("boolean") {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else if as_type == "multivariate" {
} else if as_type.as_deref() == Some("multivariate") {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
@@ -3710,6 +3714,35 @@ async fn tenant_evaluate_feature_flag(
.await
}
async fn force_override_feature_flag_for_testing_put(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let flag: String = parse_request_param(&request, "flag_key")?;
let value: String = must_parse_query_param(&request, "value")?;
let state = get_state(&request);
state
.feature_resolver
.force_override_for_testing(&flag, Some(&value));
json_response(StatusCode::OK, ())
}
async fn force_override_feature_flag_for_testing_delete(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let flag: String = parse_request_param(&request, "flag_key")?;
let state = get_state(&request);
state
.feature_resolver
.force_override_for_testing(&flag, None);
json_response(StatusCode::OK, ())
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -4086,8 +4119,14 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
|r| api_handler(r, activate_post_import_handler),
)
.get("/v1/tenant/:tenant_shard_id/feature_flag", |r| {
.get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| {
api_handler(r, tenant_evaluate_feature_flag)
})
.put("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put)
})
.delete("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - delete", r, force_override_feature_flag_for_testing_delete)
})
.any(handler_404))
}

View File

@@ -1727,12 +1727,7 @@ impl Drop for SmgrOpTimer {
impl SmgrOpFlushInProgress {
/// The caller must guarantee that `socket_fd`` outlives this function.
pub(crate) async fn measure<Fut, O>(
self,
started_at: Instant,
mut fut: Fut,
socket_fd: RawFd,
) -> O
pub(crate) async fn measure<Fut, O>(self, started_at: Instant, fut: Fut, socket_fd: RawFd) -> O
where
Fut: std::future::Future<Output = O>,
{
@@ -3426,7 +3421,7 @@ impl TimelineMetrics {
pub fn dec_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.try_len().expect("frozen layer should have no writer");
let size = layer.len();
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()
@@ -3441,7 +3436,7 @@ impl TimelineMetrics {
pub fn inc_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.try_len().expect("frozen layer should have no writer");
let size = layer.len();
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()

View File

@@ -392,16 +392,14 @@ async fn page_service_conn_main(
} else {
let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
Err(io_error).context(format!(
"Postgres connection error for tenant_id={:?} client at peer_addr={}",
tenant_id, peer_addr
"Postgres connection error for tenant_id={tenant_id:?} client at peer_addr={peer_addr}"
))
}
}
other => {
let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
other.context(format!(
"Postgres query error for tenant_id={:?} client peer_addr={}",
tenant_id, peer_addr
"Postgres query error for tenant_id={tenant_id:?} client peer_addr={peer_addr}"
))
}
}
@@ -2140,8 +2138,7 @@ impl PageServerHandler {
if request_lsn < not_modified_since {
return Err(PageStreamError::BadRequest(
format!(
"invalid request with request LSN {} and not_modified_since {}",
request_lsn, not_modified_since,
"invalid request with request LSN {request_lsn} and not_modified_since {not_modified_since}",
)
.into(),
));
@@ -3544,8 +3541,9 @@ impl proto::PageService for GrpcPageServiceHandler {
&self,
req: tonic::Request<proto::GetBaseBackupRequest>,
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
// Send 64 KB chunks to avoid large memory allocations.
const CHUNK_SIZE: usize = 64 * 1024;
// Send chunks of 256 KB to avoid large memory allocations. pagebench basebackup shows this
// to be the sweet spot where throughput is saturated.
const CHUNK_SIZE: usize = 256 * 1024;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);

View File

@@ -1185,7 +1185,7 @@ impl Timeline {
}
let origin_id = k.field6 as RepOriginId;
let origin_lsn = Lsn::des(&v)
.with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
.with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
if origin_lsn != Lsn::INVALID {
result.insert(origin_id, origin_lsn);
}
@@ -2440,8 +2440,7 @@ impl DatadirModification<'_> {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
"duplicated entries found for {path}"
);
modifying_file = Some(content);
} else {

View File

@@ -3450,7 +3450,7 @@ impl TenantShard {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {current_state:?}");
}
TenantState::Attaching => {
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
@@ -6617,7 +6617,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -6627,7 +6627,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -6641,7 +6641,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -6651,7 +6651,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -7150,7 +7150,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
ctx,
)
.await?;
@@ -7438,7 +7438,7 @@ mod tests {
.put(
gap_at_key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", gap_at_key, current_lsn))),
&Value::Image(test_img(&format!("{gap_at_key} at {current_lsn}"))),
&ctx,
)
.await?;
@@ -7477,7 +7477,7 @@ mod tests {
.put(
current_key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", current_key, current_lsn))),
&Value::Image(test_img(&format!("{current_key} at {current_lsn}"))),
&ctx,
)
.await?;
@@ -7585,7 +7585,7 @@ mod tests {
while key < end_key {
current_lsn += 0x10;
let image_value = format!("{} at {}", child_gap_at_key, current_lsn);
let image_value = format!("{child_gap_at_key} at {current_lsn}");
let mut writer = parent_timeline.writer().await;
writer
@@ -7628,7 +7628,7 @@ mod tests {
.put(
key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", key, current_lsn))),
&Value::Image(test_img(&format!("{key} at {current_lsn}"))),
&ctx,
)
.await?;
@@ -7749,7 +7749,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -7770,7 +7770,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -7784,7 +7784,7 @@ mod tests {
test_key.field6 = blknum as u32;
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", blknum, last_lsn))
test_img(&format!("{blknum} at {last_lsn}"))
);
}
@@ -7830,7 +7830,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -7859,11 +7859,11 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
println!("updating {} at {}", blknum, lsn);
println!("updating {blknum} at {lsn}");
writer.finish_write(lsn);
drop(writer);
updated[blknum] = lsn;
@@ -7874,7 +7874,7 @@ mod tests {
test_key.field6 = blknum as u32;
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", blknum, last_lsn))
test_img(&format!("{blknum} at {last_lsn}"))
);
}
@@ -7927,11 +7927,11 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} {} at {}", idx, blknum, lsn))),
&Value::Image(test_img(&format!("{idx} {blknum} at {lsn}"))),
&ctx,
)
.await?;
println!("updating [{}][{}] at {}", idx, blknum, lsn);
println!("updating [{idx}][{blknum}] at {lsn}");
writer.finish_write(lsn);
drop(writer);
updated[idx][blknum] = lsn;
@@ -8137,7 +8137,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -8154,7 +8154,7 @@ mod tests {
test_key.field6 = (blknum * STEP) as u32;
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", blknum, last_lsn))
test_img(&format!("{blknum} at {last_lsn}"))
);
}
@@ -8191,7 +8191,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -8444,7 +8444,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -8464,7 +8464,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -9385,12 +9385,7 @@ mod tests {
let end_lsn = Lsn(0x100);
let image_layers = (0x20..=0x90)
.step_by(0x10)
.map(|n| {
(
Lsn(n),
vec![(key, test_img(&format!("data key at {:x}", n)))],
)
})
.map(|n| (Lsn(n), vec![(key, test_img(&format!("data key at {n:x}")))]))
.collect();
let timeline = tenant

View File

@@ -63,8 +63,7 @@ pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
&& overlaps_with(&layer.key_range, &other_layer.key_range)
{
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
layer, other_layer
"layer violates the layer map LSN split assumption: layer {layer} intersects with layer {other_layer}"
);
return Some(err);
}

View File

@@ -3,7 +3,7 @@
use std::io;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicU64, Ordering};
use camino::Utf8PathBuf;
use num_traits::Num;
@@ -18,6 +18,7 @@ use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
@@ -30,9 +31,13 @@ pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
buffered_writer: BufferedWriter,
buffered_writer: tokio::sync::RwLock<BufferedWriter>,
bytes_written: AtomicU64,
resource_units: std::sync::Mutex<GlobalResourceUnits>,
}
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
@@ -94,9 +99,8 @@ impl EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
page_cache_file_id,
bytes_written: 0,
file: file.clone(),
buffered_writer: BufferedWriter::new(
buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new(
file,
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
@@ -104,7 +108,9 @@ impl EphemeralFile {
cancel.child_token(),
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
)),
bytes_written: AtomicU64::new(0),
resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()),
})
}
}
@@ -151,15 +157,17 @@ impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter
#[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError {
#[error("{0}")]
TooLong(String),
#[error("cancelled")]
Cancelled,
}
impl EphemeralFile {
pub(crate) fn len(&self) -> u64 {
self.bytes_written
// TODO(vlad): The value returned here is not always correct if
// we have more than one concurrent writer. Writes are always
// sequenced, but we could grab the buffered writer lock if we wanted
// to.
self.bytes_written.load(Ordering::Acquire)
}
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
@@ -186,7 +194,7 @@ impl EphemeralFile {
/// Panics if the write is short because there's no way we can recover from that.
/// TODO: make upstack handle this as an error.
pub(crate) async fn write_raw(
&mut self,
&self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<u64, EphemeralFileWriteError> {
@@ -198,22 +206,13 @@ impl EphemeralFile {
}
async fn write_raw_controlled(
&mut self,
&self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
let pos = self.bytes_written;
let mut writer = self.buffered_writer.write().await;
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
EphemeralFileWriteError::TooLong(format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
))
})?;
// Write the payload
let (nwritten, control) = self
.buffered_writer
let (nwritten, control) = writer
.write_buffered_borrowed_controlled(srcbuf, ctx)
.await
.map_err(|e| match e {
@@ -225,43 +224,69 @@ impl EphemeralFile {
"buffered writer has no short writes"
);
self.bytes_written = new_bytes_written;
// There's no realistic risk of overflow here. We won't have exabytes sized files on disk.
let pos = self
.bytes_written
.fetch_add(srcbuf.len().into_u64(), Ordering::AcqRel);
let mut resource_units = self.resource_units.lock().unwrap();
resource_units.maybe_publish_size(self.bytes_written.load(Ordering::Relaxed));
Ok((pos, control))
}
pub(crate) fn tick(&self) -> Option<u64> {
let mut resource_units = self.resource_units.lock().unwrap();
let len = self.bytes_written.load(Ordering::Relaxed);
resource_units.publish_size(len)
}
}
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
&self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
mut dst: tokio_epoll_uring::Slice<B>,
ctx: &RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let submitted_offset = self.buffered_writer.bytes_submitted();
// We will fill the slice in back to front. Hence, we need
// the slice to be fully initialized.
// TODO(vlad): Is there a nicer way of doing this?
dst.as_mut_rust_slice_full_zeroed();
let mutable = match self.buffered_writer.inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
None => {
// Timeline::cancel and hence buffered writer flush was cancelled.
// Remain read-available while timeline is shutting down.
&[]
}
};
let writer = self.buffered_writer.read().await;
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
// Read bytes written while under lock. This is a hack to deal with concurrent
// writes updating the number of bytes written. `bytes_written` is not DIO alligned
// but we may end the read there.
//
// TODO(vlad): Feels like there's a nicer path where we align the end if it
// shoots over the end of the file.
let bytes_written = self.bytes_written.load(Ordering::Acquire);
let dst_cap = dst.bytes_total().into_u64();
let end = {
// saturating_add is correct here because the max file size is u64::MAX, so,
// if start + dst.len() > u64::MAX, then we know it will be a short read
let mut end: u64 = start.saturating_add(dst_cap);
if end > self.bytes_written {
end = self.bytes_written;
if end > bytes_written {
end = bytes_written;
}
end
};
let submitted_offset = writer.bytes_submitted();
let maybe_flushed = writer.inspect_maybe_flushed();
let mutable = match writer.inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
None => {
// Timeline::cancel and hence buffered writer flush was cancelled.
// Remain read-available while timeline is shutting down.
&[]
}
};
// inclusive, exclusive
#[derive(Debug)]
struct Range<N>(N, N);
@@ -306,13 +331,33 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
let dst = if written_range.len() > 0 {
// There are three sources from which we might have to read data:
// 1. The file itself
// 2. The buffer which contains changes currently being flushed
// 3. The buffer which contains chnages yet to be flushed
//
// For better concurrency, we do them in reverse order: perform the in-memory
// reads while holding the writer lock, drop the writer lock and read from the
// file if required.
let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(submitted_offset)
.unwrap()
.into_usize();
let to_copy =
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
let bounds = dst.bounds();
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
let mut view = dst.slice({
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};
@@ -342,24 +387,15 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst
};
let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(submitted_offset)
.unwrap()
.into_usize();
let to_copy =
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
drop(writer);
let dst = if written_range.len() > 0 {
let bounds = dst.bounds();
let mut view = dst.slice({
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
} else {
dst
};
@@ -460,13 +496,15 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let mutable = file.buffered_writer.mutable();
let writer = file.buffered_writer.read().await;
let mutable = writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
drop(writer);
let write_nbytes = cap * 2 + cap / 2;
@@ -504,10 +542,11 @@ mod tests {
let file_contents = std::fs::read(file.file.path()).unwrap();
assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
let writer = file.buffered_writer.read().await;
let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap();
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
let mutable_buffer_contents = file.buffered_writer.mutable();
let mutable_buffer_contents = writer.mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
}
@@ -517,12 +556,14 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
let cap = file.buffered_writer.mutable().capacity();
let writer = file.buffered_writer.read().await;
let cap = writer.mutable().capacity();
drop(writer);
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
@@ -540,12 +581,13 @@ mod tests {
2 * cap.into_u64(),
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
);
let writer = file.buffered_writer.read().await;
assert_eq!(
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
&writer.inspect_maybe_flushed().unwrap()[0..cap],
&content[cap..cap * 2]
);
assert_eq!(
&file.buffered_writer.mutable()[0..cap / 2],
&writer.mutable()[0..cap / 2],
&content[cap * 2..cap * 2 + cap / 2]
);
}
@@ -563,13 +605,15 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let mutable = file.buffered_writer.mutable();
let writer = file.buffered_writer.read().await;
let mutable = writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
drop(writer);
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap * 2 + cap / 2)

View File

@@ -551,8 +551,7 @@ mod tests {
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
"Metadata of the old version {METADATA_OLD_FORMAT_VERSION} should be upgraded to the latest version {METADATA_FORMAT_VERSION}"
);
}

View File

@@ -1427,7 +1427,7 @@ async fn init_timeline_state(
let local_meta = dentry
.metadata()
.await
.fatal_err(&format!("Read metadata on {}", file_path));
.fatal_err(&format!("Read metadata on {file_path}"));
let file_name = file_path.file_name().expect("created it from the dentry");
if crate::is_temporary(&file_path)

View File

@@ -109,7 +109,7 @@ pub(crate) enum OnDiskValue {
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState {
pub struct VectoredValueReconstructState {
pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
pub(crate) situation: ValueReconstructSituation,
@@ -244,13 +244,60 @@ impl VectoredValueReconstructState {
res
}
/// Benchmarking utility to await for the completion of all pending ios
///
/// # Cancel-Safety
///
/// Technically fine to stop polling this future, but, the IOs will still
/// be executed to completion by the sidecar task and hold on to / consume resources.
/// Better not do it to make reasonsing about the system easier.
#[cfg(feature = "benchmarking")]
pub async fn sink_pending_ios(self) -> Result<(), std::io::Error> {
let mut res = Ok(());
// We should try hard not to bail early, so that by the time we return from this
// function, all IO for this value is done. It's not required -- we could totally
// stop polling the IO futures in the sidecar task, they need to support that,
// but just stopping to poll doesn't reduce the IO load on the disk. It's easier
// to reason about the system if we just wait for all IO to complete, even if
// we're no longer interested in the result.
//
// Revisit this when IO futures are replaced with a more sophisticated IO system
// and an IO scheduler, where we know which IOs were submitted and which ones
// just queued. Cf the comment on IoConcurrency::spawn_io.
for (_lsn, waiter) in self.on_disk_values {
let value_recv_res = waiter
.wait_completion()
// we rely on the caller to poll us to completion, so this is not a bail point
.await;
match (&mut res, value_recv_res) {
(Err(_), _) => {
// We've already failed, no need to process more.
}
(Ok(_), Err(_wait_err)) => {
// This shouldn't happen - likely the sidecar task panicked.
unreachable!();
}
(Ok(_), Ok(Err(err))) => {
let err: std::io::Error = err;
res = Err(err);
}
(Ok(_ok), Ok(Ok(OnDiskValue::RawImage(_img)))) => {}
(Ok(_ok), Ok(Ok(OnDiskValue::WalRecordOrImage(_buf)))) => {}
}
}
res
}
}
/// Bag of data accumulated during a vectored get..
pub(crate) struct ValuesReconstructState {
pub struct ValuesReconstructState {
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
/// should not expect to get anything from this hashmap.
pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
pub keys: HashMap<Key, VectoredValueReconstructState>,
/// The keys which are already retrieved
keys_done: KeySpaceRandomAccum,
@@ -272,7 +319,7 @@ pub(crate) struct ValuesReconstructState {
/// The desired end state is that we always do parallel IO.
/// This struct and the dispatching in the impl will be removed once
/// we've built enough confidence.
pub(crate) enum IoConcurrency {
pub enum IoConcurrency {
Sequential,
SidecarTask {
task_id: usize,
@@ -317,10 +364,7 @@ impl IoConcurrency {
Self::spawn(SelectedIoConcurrency::Sequential)
}
pub(crate) fn spawn_from_conf(
conf: GetVectoredConcurrentIo,
gate_guard: GateGuard,
) -> IoConcurrency {
pub fn spawn_from_conf(conf: GetVectoredConcurrentIo, gate_guard: GateGuard) -> IoConcurrency {
let selected = match conf {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
@@ -425,16 +469,6 @@ impl IoConcurrency {
}
}
pub(crate) fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
},
}
}
/// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
///
/// The IO is represented as an opaque future.
@@ -573,6 +607,18 @@ impl IoConcurrency {
}
}
impl Clone for IoConcurrency {
fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
},
}
}
}
/// Make noise in case the [`ValuesReconstructState`] gets dropped while
/// there are still IOs in flight.
/// Refer to `collect_pending_ios` for why we prefer not to do that.
@@ -603,7 +649,7 @@ impl Drop for ValuesReconstructState {
}
impl ValuesReconstructState {
pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
pub fn new(io_concurrency: IoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),

View File

@@ -783,7 +783,7 @@ impl DeltaLayer {
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
.with_context(|| format!("Failed to open file '{path}'"))?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
@@ -1401,7 +1401,7 @@ impl DeltaLayerInner {
match val {
Value::Image(img) => {
let checkpoint = CheckPoint::decode(&img)?;
println!(" CHECKPOINT: {:?}", checkpoint);
println!(" CHECKPOINT: {checkpoint:?}");
}
Value::WalRecord(_rec) => {
println!(" unexpected walrecord value for checkpoint key");

View File

@@ -272,8 +272,7 @@ impl ImageLayer {
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
"{fname}.{filename_disambiguator:x}.{TEMP_FILE_SUFFIX}"
))
}
@@ -370,7 +369,7 @@ impl ImageLayer {
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
.with_context(|| format!("Failed to open file '{path}'"))?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
@@ -1475,7 +1474,7 @@ mod test {
assert_eq!(l1, expect_lsn);
assert_eq!(&i1, i2);
}
(o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
(o1, o2) => panic!("iterators length mismatch: {o1:?}, {o2:?}"),
}
}
}

View File

@@ -70,23 +70,15 @@ pub struct InMemoryLayer {
/// We use a separate lock for the index to reduce the critical section
/// during which reads cannot be planned.
///
/// If you need access to both the index and the underlying file at the same time,
/// respect the following locking order to avoid deadlocks:
/// 1. [`InMemoryLayer::inner`]
/// 2. [`InMemoryLayer::index`]
///
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
/// so it is not necessary to hold simultaneous locks on index.
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
/// Note that the file backing [`InMemoryLayer::file`] is append-only,
/// so it is not necessary to hold a lock on the index while reading or writing from the file.
/// In particular:
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
/// 1. It is safe to read and release [`InMemoryLayer::index`] before reading from [`InMemoryLayer::file`].
/// 2. It is safe to write to [`InMemoryLayer::file`] before locking and updating [`InMemoryLayer::index`].
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once,
/// and `index` (see rationale there).
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
/// Wrapper for the actual on-disk file. Uses interior mutability for concurrent reads/writes.
file: EphemeralFile,
estimated_in_mem_size: AtomicU64,
}
@@ -96,20 +88,10 @@ impl std::fmt::Debug for InMemoryLayer {
f.debug_struct("InMemoryLayer")
.field("start_lsn", &self.start_lsn)
.field("end_lsn", &self.end_lsn)
.field("inner", &self.inner)
.finish()
}
}
pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
resource_units: GlobalResourceUnits,
}
/// Support the same max blob length as blob_io, because ultimately
/// all the InMemoryLayer contents end up being written into a delta layer,
/// using the [`crate::tenant::blob_io`].
@@ -258,12 +240,6 @@ struct IndexEntryUnpacked {
pos: u64,
}
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerInner").finish()
}
}
/// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
/// to minimize contention.
///
@@ -280,7 +256,7 @@ pub(crate) struct GlobalResources {
}
// Per-timeline RAII struct for its contribution to [`GlobalResources`]
struct GlobalResourceUnits {
pub(crate) struct GlobalResourceUnits {
// How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
// for decrementing the global counter by this many bytes when dropped.
dirty_bytes: u64,
@@ -292,7 +268,7 @@ impl GlobalResourceUnits {
// updated when the Timeline "ticks" in the background.
const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
fn new() -> Self {
pub(crate) fn new() -> Self {
GLOBAL_RESOURCES
.dirty_layers
.fetch_add(1, AtomicOrdering::Relaxed);
@@ -304,7 +280,7 @@ impl GlobalResourceUnits {
///
/// Returns the effective layer size limit that should be applied, if any, to keep
/// the total number of dirty bytes below the configured maximum.
fn publish_size(&mut self, size: u64) -> Option<u64> {
pub(crate) fn publish_size(&mut self, size: u64) -> Option<u64> {
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
Ordering::Greater => {
@@ -349,7 +325,7 @@ impl GlobalResourceUnits {
// Call publish_size if the input size differs from last published size by more than
// the drift limit
fn maybe_publish_size(&mut self, size: u64) {
pub(crate) fn maybe_publish_size(&mut self, size: u64) {
let publish = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => false,
Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
@@ -398,8 +374,8 @@ impl InMemoryLayer {
}
}
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
pub(crate) fn len(&self) -> u64 {
self.file.len()
}
pub(crate) fn assert_writable(&self) {
@@ -430,7 +406,7 @@ impl InMemoryLayer {
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub(crate) async fn get_values_reconstruct_data(
pub async fn get_values_reconstruct_data(
self: &Arc<InMemoryLayer>,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
@@ -479,14 +455,13 @@ impl InMemoryLayer {
}
}
}
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
drop(index); // release the lock before we spawn the IO
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
.spawn_io(async move {
let inner = read_from.inner.read().await;
let f = vectored_dio_read::execute(
&inner.file,
&read_from.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
@@ -518,7 +493,6 @@ impl InMemoryLayer {
// This is kinda forced for InMemoryLayer because we need to inner.read() anyway,
// but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit
// drop for consistency among all three layer types.
drop(inner);
drop(read_from);
})
.await;
@@ -537,7 +511,7 @@ fn inmem_layer_log_display(
start_lsn: Lsn,
end_lsn: Lsn,
) -> std::fmt::Result {
write!(f, "timeline {} in-memory ", timeline)?;
write!(f, "timeline {timeline} in-memory ")?;
inmem_layer_display(f, start_lsn, end_lsn)
}
@@ -549,12 +523,6 @@ impl std::fmt::Display for InMemoryLayer {
}
impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
}
pub fn estimated_in_mem_size(&self) -> u64 {
self.estimated_in_mem_size.load(AtomicOrdering::Relaxed)
}
@@ -587,10 +555,7 @@ impl InMemoryLayer {
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: RwLock::new(BTreeMap::new()),
inner: RwLock::new(InMemoryLayerInner {
file,
resource_units: GlobalResourceUnits::new(),
}),
file,
estimated_in_mem_size: AtomicU64::new(0),
})
}
@@ -599,41 +564,37 @@ impl InMemoryLayer {
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
/// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
///
/// This method shall not be called concurrently. We enforce this property via [`crate::tenant::Timeline::write_lock`].
///
/// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
pub async fn put_batch(
&self,
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let (base_offset, metadata) = {
let mut inner = self.inner.write().await;
self.assert_writable();
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = self.file.len();
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// Write the batch to the file
self.file.write_raw(&raw, ctx).await?;
let new_size = self.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
inner.resource_units.maybe_publish_size(new_size);
(base_offset, metadata)
};
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
// Update the index with the new entries
let mut index = self.index.write().await;
@@ -686,10 +647,8 @@ impl InMemoryLayer {
self.opened_at
}
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
inner.resource_units.publish_size(size)
pub(crate) fn tick(&self) -> Option<u64> {
self.file.tick()
}
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
@@ -753,12 +712,6 @@ impl InMemoryLayer {
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. See the comment on
// [`InMemoryLayer::freeze`] to understand how locking between the append path
// and layer flushing works.
let inner = self.inner.read().await;
let index = self.index.read().await;
use l0_flush::Inner;
@@ -793,7 +746,7 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = self.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in index.iter() {

View File

@@ -380,7 +380,7 @@ impl<B: Buffer> std::fmt::Debug for LogicalReadState<B> {
write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer))
}
LogicalReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)),
LogicalReadState::Error(e) => write!(f, "Error({:?})", e),
LogicalReadState::Error(e) => write!(f, "Error({e:?})"),
LogicalReadState::Undefined => write!(f, "Undefined"),
}
}

View File

@@ -105,7 +105,7 @@ impl std::fmt::Display for Layer {
impl std::fmt::Debug for Layer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
write!(f, "{self}")
}
}

View File

@@ -178,7 +178,7 @@ pub enum LastImageLayerCreationStatus {
impl std::fmt::Display for ImageLayerCreationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}
@@ -632,7 +632,7 @@ pub enum ReadPathLayerId {
impl std::fmt::Display for ReadPathLayerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReadPathLayerId::PersistentLayer(key) => write!(f, "{}", key),
ReadPathLayerId::PersistentLayer(key) => write!(f, "{key}"),
ReadPathLayerId::InMemoryLayer(range) => {
write!(f, "in-mem {}..{}", range.start, range.end)
}
@@ -708,7 +708,7 @@ impl MissingKeyError {
impl std::fmt::Debug for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
write!(f, "{self}")
}
}
@@ -721,19 +721,19 @@ impl std::fmt::Display for MissingKeyError {
)?;
if let Some(ref ancestor_lsn) = self.ancestor_lsn {
write!(f, ", ancestor {}", ancestor_lsn)?;
write!(f, ", ancestor {ancestor_lsn}")?;
}
if let Some(ref query) = self.query {
write!(f, ", query {}", query)?;
write!(f, ", query {query}")?;
}
if let Some(ref read_path) = self.read_path {
write!(f, "\n{}", read_path)?;
write!(f, "\n{read_path}")?;
}
if let Some(ref backtrace) = self.backtrace {
write!(f, "\n{}", backtrace)?;
write!(f, "\n{backtrace}")?;
}
Ok(())
@@ -816,7 +816,7 @@ impl From<layer_manager::Shutdown> for FlushLayerError {
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetVectoredError {
pub enum GetVectoredError {
#[error("timeline shutting down")]
Cancelled,
@@ -849,7 +849,7 @@ impl From<GetReadyAncestorError> for GetVectoredError {
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetReadyAncestorError {
pub enum GetReadyAncestorError {
#[error("ancestor LSN wait error")]
AncestorLsnTimeout(#[from] WaitLsnError),
@@ -939,7 +939,7 @@ impl std::fmt::Debug for Timeline {
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum WaitLsnError {
pub enum WaitLsnError {
// Called on a timeline which is shutting down
#[error("Shutdown")]
Shutdown,
@@ -1902,16 +1902,11 @@ impl Timeline {
return;
};
let Some(current_size) = open_layer.try_len() else {
// Unexpected: since we hold the write guard, nobody else should be writing to this layer, so
// read lock to get size should always succeed.
tracing::warn!("Lock conflict while reading size of open layer");
return;
};
let current_size = open_layer.len();
let current_lsn = self.get_last_record_lsn();
let checkpoint_distance_override = open_layer.tick().await;
let checkpoint_distance_override = open_layer.tick();
if let Some(size_override) = checkpoint_distance_override {
if current_size > size_override {
@@ -7184,9 +7179,7 @@ impl Timeline {
if let Some(end) = layer_end_lsn {
assert!(
end <= last_record_lsn,
"advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}",
end,
last_record_lsn,
"advance last record lsn before inserting a layer, end_lsn={end}, last_record_lsn={last_record_lsn}",
);
}
@@ -7372,7 +7365,7 @@ impl TimelineWriter<'_> {
.tl
.get_layer_for_write(at, &self.write_guard, ctx)
.await?;
let initial_size = layer.size().await?;
let initial_size = layer.len();
let last_freeze_at = self.last_freeze_at.load();
self.write_guard.replace(TimelineWriterState::new(

View File

@@ -977,7 +977,7 @@ impl KeyHistoryRetention {
tline
.reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
.await
.with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
.with_context(|| format!("verification failed for key {key} at lsn {lsn}"))?;
Ok(())
}
@@ -2647,15 +2647,15 @@ impl Timeline {
use std::fmt::Write;
let mut output = String::new();
if let Some((key, _, _)) = replay_history.first() {
write!(output, "key={} ", key).unwrap();
write!(output, "key={key} ").unwrap();
let mut cnt = 0;
for (_, lsn, val) in replay_history {
if val.is_image() {
write!(output, "i@{} ", lsn).unwrap();
write!(output, "i@{lsn} ").unwrap();
} else if val.will_init() {
write!(output, "di@{} ", lsn).unwrap();
write!(output, "di@{lsn} ").unwrap();
} else {
write!(output, "d@{} ", lsn).unwrap();
write!(output, "d@{lsn} ").unwrap();
}
cnt += 1;
if cnt >= 128 {

View File

@@ -275,12 +275,20 @@ pub(super) async fn handle_walreceiver_connection(
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
let walingest_res = select! {
walingest_res = walingest_future => walingest_res,
_ = cancellation.cancelled() => {
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
debug!("Connection cancelled");
return Err(WalReceiverError::Cancelled);
},
};
let mut walingest = walingest_res.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let (format, compression) = match protocol {
PostgresClientProtocol::Interpreted {
@@ -360,8 +368,7 @@ pub(super) async fn handle_walreceiver_connection(
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn})"
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));

Some files were not shown because too many files have changed in this diff Show More