mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
25 Commits
ci-run/pr-
...
conrad/sim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0e1e1dd74 | ||
|
|
010cd34635 | ||
|
|
11ffe4c86c | ||
|
|
d6a5085664 | ||
|
|
16d9889a51 | ||
|
|
517a3d0d86 | ||
|
|
27ca1e21be | ||
|
|
1dc01c9bed | ||
|
|
7c4c36f5ac | ||
|
|
a2d623696c | ||
|
|
aa75722010 | ||
|
|
6c6de6382a | ||
|
|
158d84ea30 | ||
|
|
4dd9ca7b04 | ||
|
|
552249607d | ||
|
|
a29772bf6e | ||
|
|
0efff1db26 | ||
|
|
5eecde461d | ||
|
|
85164422d0 | ||
|
|
6c3aba7c44 | ||
|
|
68a175d545 | ||
|
|
5e2c444525 | ||
|
|
8d711229c1 | ||
|
|
0e490f3be7 | ||
|
|
7e41ef1bec |
@@ -189,7 +189,6 @@ jobs:
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
|
||||
|
||||
- name: Build all
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables
|
||||
run: mold -run make ${make_vars} all -j$(nproc) CARGO_BUILD_FLAGS="$CARGO_FLAGS"
|
||||
|
||||
|
||||
83
.github/workflows/proxy-benchmark.yml
vendored
Normal file
83
.github/workflows/proxy-benchmark.yml
vendored
Normal file
@@ -0,0 +1,83 @@
|
||||
name: Periodic proxy performance test on unit-perf hetzner runner
|
||||
|
||||
on:
|
||||
push: # TODO: remove after testing
|
||||
branches:
|
||||
- test-proxy-bench # Runs on pushes to branches starting with test-proxy-bench
|
||||
# schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
# ┌───────────── minute (0 - 59)
|
||||
# │ ┌───────────── hour (0 - 23)
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
# - cron: '0 5 * * *' # Runs at 5 UTC once a day
|
||||
workflow_dispatch: # adds an ability to run this manually
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euo pipefail {0}
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}
|
||||
cancel-in-progress: false
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
run_periodic_proxybench_test:
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
contents: write
|
||||
pull-requests: write
|
||||
runs-on: [self-hosted, unit-perf]
|
||||
timeout-minutes: 60 # 1h timeout
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
steps:
|
||||
- name: Checkout proxy-bench Repo
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: neondatabase/proxy-bench
|
||||
path: proxy-bench
|
||||
|
||||
- name: Set up the environment which depends on $RUNNER_TEMP on nvme drive
|
||||
id: set-env
|
||||
shell: bash -euxo pipefail {0}
|
||||
run: |
|
||||
PROXY_BENCH_PATH=$(realpath ./proxy-bench)
|
||||
{
|
||||
echo "PROXY_BENCH_PATH=$PROXY_BENCH_PATH"
|
||||
echo "NEON_DIR=${RUNNER_TEMP}/neon"
|
||||
echo "TEST_OUTPUT=${PROXY_BENCH_PATH}/test_output"
|
||||
echo ""
|
||||
} >> "$GITHUB_ENV"
|
||||
|
||||
- name: Run proxy-bench
|
||||
run: ./${PROXY_BENCH_PATH}/run.sh
|
||||
|
||||
- name: Ingest Bench Results # neon repo script
|
||||
if: success()
|
||||
run: |
|
||||
mkdir -p $TEST_OUTPUT
|
||||
python $NEON_DIR/scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
|
||||
|
||||
- name: Push Metrics to Proxy perf database
|
||||
if: success()
|
||||
env:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PROXY_TEST_RESULT_CONNSTR }}"
|
||||
REPORT_FROM: $TEST_OUTPUT
|
||||
run: $NEON_DIR/scripts/generate_and_push_perf_report.sh
|
||||
|
||||
- name: Docker cleanup
|
||||
run: docker compose down
|
||||
|
||||
- name: Notify Failure
|
||||
if: failure()
|
||||
run: echo "Proxy bench job failed" && exit 1
|
||||
2
Makefile
2
Makefile
@@ -159,8 +159,6 @@ postgres-%: postgres-configure-% \
|
||||
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
|
||||
+@echo "Compiling PostgreSQL $*"
|
||||
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install
|
||||
+@echo "Compiling libpq $*"
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/src/interfaces/libpq install
|
||||
+@echo "Compiling pg_prewarm $*"
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install
|
||||
+@echo "Compiling pg_buffercache $*"
|
||||
|
||||
@@ -22,7 +22,7 @@ sql_exporter.yml: $(jsonnet_files)
|
||||
--output-file etc/$@ \
|
||||
--tla-str collector_name=neon_collector \
|
||||
--tla-str collector_file=neon_collector.yml \
|
||||
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter' \
|
||||
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter&pgaudit.log=none' \
|
||||
etc/sql_exporter.jsonnet
|
||||
|
||||
sql_exporter_autoscaling.yml: $(jsonnet_files)
|
||||
@@ -30,7 +30,7 @@ sql_exporter_autoscaling.yml: $(jsonnet_files)
|
||||
--output-file etc/$@ \
|
||||
--tla-str collector_name=neon_collector_autoscaling \
|
||||
--tla-str collector_file=neon_collector_autoscaling.yml \
|
||||
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling' \
|
||||
--tla-str 'connection_string=postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling&pgaudit.log=none' \
|
||||
etc/sql_exporter.jsonnet
|
||||
|
||||
.PHONY: clean
|
||||
|
||||
@@ -171,9 +171,6 @@ RUN cd postgres && \
|
||||
eval $CONFIGURE_CMD && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
|
||||
# Install headers
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
|
||||
# Enable some of contrib extensions
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/dblink.control && \
|
||||
@@ -1568,20 +1565,20 @@ ARG PG_VERSION
|
||||
WORKDIR /ext-src
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14") \
|
||||
export PGAUDIT_VERSION=1.6.2 \
|
||||
export PGAUDIT_CHECKSUM=1f350d70a0cbf488c0f2b485e3a5c9b11f78ad9e3cbb95ef6904afa1eb3187eb \
|
||||
export PGAUDIT_VERSION=1.6.3 \
|
||||
export PGAUDIT_CHECKSUM=37a8f5a7cc8d9188e536d15cf0fdc457fcdab2547caedb54442c37f124110919 \
|
||||
;; \
|
||||
"v15") \
|
||||
export PGAUDIT_VERSION=1.7.0 \
|
||||
export PGAUDIT_CHECKSUM=8f4a73e451c88c567e516e6cba7dc1e23bc91686bb6f1f77f8f3126d428a8bd8 \
|
||||
export PGAUDIT_VERSION=1.7.1 \
|
||||
export PGAUDIT_CHECKSUM=e9c8e6e092d82b2f901d72555ce0fe7780552f35f8985573796cd7e64b09d4ec \
|
||||
;; \
|
||||
"v16") \
|
||||
export PGAUDIT_VERSION=16.0 \
|
||||
export PGAUDIT_CHECKSUM=d53ef985f2d0b15ba25c512c4ce967dce07b94fd4422c95bd04c4c1a055fe738 \
|
||||
export PGAUDIT_VERSION=16.1 \
|
||||
export PGAUDIT_CHECKSUM=3bae908ab70ba0c6f51224009dbcfff1a97bd6104c6273297a64292e1b921fee \
|
||||
;; \
|
||||
"v17") \
|
||||
export PGAUDIT_VERSION=17.0 \
|
||||
export PGAUDIT_CHECKSUM=7d0d08d030275d525f36cd48b38c6455f1023da863385badff0cec44965bfd8c \
|
||||
export PGAUDIT_VERSION=17.1 \
|
||||
export PGAUDIT_CHECKSUM=9c5f37504d393486cc75d2ced83f75f5899be64fa85f689d6babb833b4361e6c \
|
||||
;; \
|
||||
*) \
|
||||
echo "pgaudit is not supported on this PostgreSQL version" && exit 1;; \
|
||||
|
||||
@@ -26,7 +26,7 @@ commands:
|
||||
- name: postgres-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
|
||||
- name: pgbouncer-exporter
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
@@ -59,7 +59,7 @@ files:
|
||||
# the rules use ALL as the hostname. Avoid the pointless lookups and the "unable to
|
||||
# resolve host" log messages that they generate.
|
||||
Defaults !fqdn
|
||||
|
||||
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
|
||||
@@ -26,7 +26,7 @@ commands:
|
||||
- name: postgres-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter pgaudit.log=none" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
|
||||
- name: pgbouncer-exporter
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
@@ -59,7 +59,7 @@ files:
|
||||
# the rules use ALL as the hostname. Avoid the pointless lookups and the "unable to
|
||||
# resolve host" log messages that they generate.
|
||||
Defaults !fqdn
|
||||
|
||||
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
|
||||
@@ -482,10 +482,8 @@ async fn cmd_pgdata(
|
||||
};
|
||||
|
||||
let superuser = "cloud_admin";
|
||||
let destination_connstring = format!(
|
||||
"host=localhost port={} user={} dbname=neondb",
|
||||
pg_port, superuser
|
||||
);
|
||||
let destination_connstring =
|
||||
format!("host=localhost port={pg_port} user={superuser} dbname=neondb");
|
||||
|
||||
let pgdata_dir = workdir.join("pgdata");
|
||||
let mut proc = PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());
|
||||
|
||||
@@ -69,7 +69,7 @@ impl clap::builder::TypedValueParser for S3Uri {
|
||||
S3Uri::from_str(value_str).map_err(|e| {
|
||||
clap::Error::raw(
|
||||
clap::error::ErrorKind::InvalidValue,
|
||||
format!("Failed to parse S3 URI: {}", e),
|
||||
format!("Failed to parse S3 URI: {e}"),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<Cat
|
||||
|
||||
spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
@@ -119,7 +119,7 @@ pub async fn get_database_schema(
|
||||
_ => {
|
||||
let mut lines = stderr_reader.lines();
|
||||
if let Some(line) = lines.next_line().await? {
|
||||
if line.contains(&format!("FATAL: database \"{}\" does not exist", dbname)) {
|
||||
if line.contains(&format!("FATAL: database \"{dbname}\" does not exist")) {
|
||||
return Err(SchemaDumpError::DatabaseDoesNotExist);
|
||||
}
|
||||
warn!("pg_dump stderr: {}", line)
|
||||
|
||||
@@ -250,8 +250,7 @@ impl ParsedSpec {
|
||||
// duplicate entry?
|
||||
if current == previous {
|
||||
return Err(format!(
|
||||
"duplicate entry in safekeeper_connstrings: {}!",
|
||||
current,
|
||||
"duplicate entry in safekeeper_connstrings: {current}!",
|
||||
));
|
||||
}
|
||||
|
||||
@@ -406,11 +405,11 @@ impl ComputeNode {
|
||||
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
|
||||
// Unset them via connection string options before connecting to the database.
|
||||
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
|
||||
let options = match conn_conf.get_options() {
|
||||
// Allow the control plane to override any options set by the
|
||||
// compute
|
||||
Some(options) => format!("{} {}", EXTRA_OPTIONS, options),
|
||||
Some(options) => format!("{EXTRA_OPTIONS} {options}"),
|
||||
None => EXTRA_OPTIONS.to_string(),
|
||||
};
|
||||
conn_conf.options(&options);
|
||||
@@ -1127,7 +1126,7 @@ impl ComputeNode {
|
||||
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
|
||||
// Format connstr
|
||||
let id = connstr.clone();
|
||||
let connstr = format!("postgresql://no_user@{}", connstr);
|
||||
let connstr = format!("postgresql://no_user@{connstr}");
|
||||
let options = format!(
|
||||
"-c timeline_id={} tenant_id={}",
|
||||
pspec.timeline_id, pspec.tenant_id
|
||||
@@ -1490,7 +1489,7 @@ impl ComputeNode {
|
||||
let (mut client, connection) = conf.connect(NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1633,7 +1632,7 @@ impl ComputeNode {
|
||||
Ok((mut client, connection)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
if let Err(e) = handle_migrations(&mut client).await {
|
||||
@@ -1937,7 +1936,7 @@ impl ComputeNode {
|
||||
let (client, connection) = connect_result.unwrap();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
let result = client
|
||||
@@ -2106,7 +2105,7 @@ LIMIT 100",
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
.with_context(|| format!("Failed to execute query: {query}"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -2133,7 +2132,7 @@ LIMIT 100",
|
||||
let version: Option<ExtVersion> = db_client
|
||||
.query_opt(version_query, &[&ext_name])
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", version_query))?
|
||||
.with_context(|| format!("Failed to execute query: {version_query}"))?
|
||||
.map(|row| row.get(0));
|
||||
|
||||
// sanitize the inputs as postgres idents.
|
||||
@@ -2148,14 +2147,14 @@ LIMIT 100",
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
.with_context(|| format!("Failed to execute query: {query}"))?;
|
||||
} else {
|
||||
let query =
|
||||
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
.with_context(|| format!("Failed to execute query: {query}"))?;
|
||||
}
|
||||
|
||||
Ok(ext_version)
|
||||
|
||||
@@ -51,7 +51,7 @@ pub fn write_postgres_conf(
|
||||
|
||||
// Write the postgresql.conf content from the spec file as is.
|
||||
if let Some(conf) = &spec.cluster.postgresql_conf {
|
||||
writeln!(file, "{}", conf)?;
|
||||
writeln!(file, "{conf}")?;
|
||||
}
|
||||
|
||||
// Add options for connecting to storage
|
||||
@@ -70,7 +70,7 @@ pub fn write_postgres_conf(
|
||||
);
|
||||
// If generation is given, prepend sk list with g#number:
|
||||
if let Some(generation) = spec.safekeepers_generation {
|
||||
write!(neon_safekeepers_value, "g#{}:", generation)?;
|
||||
write!(neon_safekeepers_value, "g#{generation}:")?;
|
||||
}
|
||||
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
|
||||
writeln!(
|
||||
@@ -109,8 +109,8 @@ pub fn write_postgres_conf(
|
||||
tls::update_key_path_blocking(pgdata_path, tls_config);
|
||||
|
||||
// these are the default, but good to be explicit.
|
||||
writeln!(file, "ssl_cert_file = '{}'", SERVER_CRT)?;
|
||||
writeln!(file, "ssl_key_file = '{}'", SERVER_KEY)?;
|
||||
writeln!(file, "ssl_cert_file = '{SERVER_CRT}'")?;
|
||||
writeln!(file, "ssl_key_file = '{SERVER_KEY}'")?;
|
||||
}
|
||||
|
||||
// Locales
|
||||
@@ -191,8 +191,7 @@ pub fn write_postgres_conf(
|
||||
}
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='{}{}'",
|
||||
libs, extra_shared_preload_libraries
|
||||
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
|
||||
)?;
|
||||
} else {
|
||||
// Typically, this should be unreacheable,
|
||||
@@ -244,8 +243,7 @@ pub fn write_postgres_conf(
|
||||
}
|
||||
writeln!(
|
||||
file,
|
||||
"shared_preload_libraries='{}{}'",
|
||||
libs, extra_shared_preload_libraries
|
||||
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
|
||||
)?;
|
||||
} else {
|
||||
// Typically, this should be unreacheable,
|
||||
@@ -263,7 +261,7 @@ pub fn write_postgres_conf(
|
||||
}
|
||||
}
|
||||
|
||||
writeln!(file, "neon.extension_server_port={}", extension_server_port)?;
|
||||
writeln!(file, "neon.extension_server_port={extension_server_port}")?;
|
||||
|
||||
if spec.drop_subscriptions_before_start {
|
||||
writeln!(file, "neon.disable_logical_replication_subscribers=true")?;
|
||||
@@ -291,7 +289,7 @@ where
|
||||
{
|
||||
let path = pgdata_path.join("compute_ctl_temp_override.conf");
|
||||
let mut file = File::create(path)?;
|
||||
write!(file, "{}", options)?;
|
||||
write!(file, "{options}")?;
|
||||
|
||||
let res = exec();
|
||||
|
||||
|
||||
@@ -296,10 +296,7 @@ async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Re
|
||||
async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)> {
|
||||
let resp = reqwest::get(uri).await.map_err(|e| {
|
||||
(
|
||||
format!(
|
||||
"could not perform remote extensions server request: {:?}",
|
||||
e
|
||||
),
|
||||
format!("could not perform remote extensions server request: {e:?}"),
|
||||
UNKNOWN_HTTP_STATUS.to_string(),
|
||||
)
|
||||
})?;
|
||||
@@ -309,7 +306,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
|
||||
StatusCode::OK => match resp.bytes().await {
|
||||
Ok(resp) => Ok(resp),
|
||||
Err(e) => Err((
|
||||
format!("could not read remote extensions server response: {:?}", e),
|
||||
format!("could not read remote extensions server response: {e:?}"),
|
||||
// It's fine to return and report error with status as 200 OK,
|
||||
// because we still failed to read the response.
|
||||
status.to_string(),
|
||||
@@ -320,10 +317,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
|
||||
status.to_string(),
|
||||
)),
|
||||
_ => Err((
|
||||
format!(
|
||||
"unexpected remote extensions server response status code: {}",
|
||||
status
|
||||
),
|
||||
format!("unexpected remote extensions server response status code: {status}"),
|
||||
status.to_string(),
|
||||
)),
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ pub(in crate::http) async fn configure(
|
||||
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
let msg = format!("compute configuration failed: {err:?}");
|
||||
return Err(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
|
||||
let (mut client, connection) = conf.connect(NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
@@ -57,7 +57,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
|
||||
let (client, connection) = conf.connect(NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -130,7 +130,7 @@ fn try_acquire_lsn_lease(
|
||||
lsn: Lsn,
|
||||
) -> Result<Option<SystemTime>> {
|
||||
let mut client = config.connect(NoTls)?;
|
||||
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
|
||||
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
|
||||
let res = client.simple_query(&cmd)?;
|
||||
let msg = match res.first() {
|
||||
Some(msg) => msg,
|
||||
|
||||
@@ -36,9 +36,9 @@ pub fn escape_literal(s: &str) -> String {
|
||||
let res = s.replace('\'', "''").replace('\\', "\\\\");
|
||||
|
||||
if res.contains('\\') {
|
||||
format!("E'{}'", res)
|
||||
format!("E'{res}'")
|
||||
} else {
|
||||
format!("'{}'", res)
|
||||
format!("'{res}'")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ pub fn escape_literal(s: &str) -> String {
|
||||
/// with `'{}'` is not required, as it returns a ready-to-use config string.
|
||||
pub fn escape_conf_value(s: &str) -> String {
|
||||
let res = s.replace('\'', "''").replace('\\', "\\\\");
|
||||
format!("'{}'", res)
|
||||
format!("'{res}'")
|
||||
}
|
||||
|
||||
pub trait GenericOptionExt {
|
||||
@@ -446,7 +446,7 @@ pub async fn tune_pgbouncer(
|
||||
let mut pgbouncer_connstr =
|
||||
"host=localhost port=6432 dbname=pgbouncer user=postgres sslmode=disable".to_string();
|
||||
if let Ok(pass) = std::env::var("PGBOUNCER_PASSWORD") {
|
||||
pgbouncer_connstr.push_str(format!(" password={}", pass).as_str());
|
||||
pgbouncer_connstr.push_str(format!(" password={pass}").as_str());
|
||||
}
|
||||
pgbouncer_connstr
|
||||
};
|
||||
@@ -464,7 +464,7 @@ pub async fn tune_pgbouncer(
|
||||
Ok((client, connection)) => {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
break client;
|
||||
|
||||
@@ -23,12 +23,12 @@ fn do_control_plane_request(
|
||||
) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
|
||||
let resp = reqwest::blocking::Client::new()
|
||||
.get(uri)
|
||||
.header("Authorization", format!("Bearer {}", jwt))
|
||||
.header("Authorization", format!("Bearer {jwt}"))
|
||||
.send()
|
||||
.map_err(|e| {
|
||||
(
|
||||
true,
|
||||
format!("could not perform request to control plane: {:?}", e),
|
||||
format!("could not perform request to control plane: {e:?}"),
|
||||
UNKNOWN_HTTP_STATUS.to_string(),
|
||||
)
|
||||
})?;
|
||||
@@ -39,7 +39,7 @@ fn do_control_plane_request(
|
||||
Ok(spec_resp) => Ok(spec_resp),
|
||||
Err(e) => Err((
|
||||
true,
|
||||
format!("could not deserialize control plane response: {:?}", e),
|
||||
format!("could not deserialize control plane response: {e:?}"),
|
||||
status.to_string(),
|
||||
)),
|
||||
},
|
||||
@@ -62,7 +62,7 @@ fn do_control_plane_request(
|
||||
// or some internal failure happened. Doesn't make much sense to retry in this case.
|
||||
_ => Err((
|
||||
false,
|
||||
format!("unexpected control plane response status code: {}", status),
|
||||
format!("unexpected control plane response status code: {status}"),
|
||||
status.to_string(),
|
||||
)),
|
||||
}
|
||||
|
||||
@@ -933,56 +933,53 @@ async fn get_operations<'a>(
|
||||
PerDatabasePhase::DeleteDBRoleReferences => {
|
||||
let ctx = ctx.read().await;
|
||||
|
||||
let operations =
|
||||
spec.delta_operations
|
||||
.iter()
|
||||
.flatten()
|
||||
.filter(|op| op.action == "delete_role")
|
||||
.filter_map(move |op| {
|
||||
if db.is_owned_by(&op.name) {
|
||||
return None;
|
||||
}
|
||||
if !ctx.roles.contains_key(&op.name) {
|
||||
return None;
|
||||
}
|
||||
let quoted = op.name.pg_quote();
|
||||
let new_owner = match &db {
|
||||
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
|
||||
DB::UserDB(db) => db.owner.pg_quote(),
|
||||
};
|
||||
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
|
||||
let operations = spec
|
||||
.delta_operations
|
||||
.iter()
|
||||
.flatten()
|
||||
.filter(|op| op.action == "delete_role")
|
||||
.filter_map(move |op| {
|
||||
if db.is_owned_by(&op.name) {
|
||||
return None;
|
||||
}
|
||||
if !ctx.roles.contains_key(&op.name) {
|
||||
return None;
|
||||
}
|
||||
let quoted = op.name.pg_quote();
|
||||
let new_owner = match &db {
|
||||
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
|
||||
DB::UserDB(db) => db.owner.pg_quote(),
|
||||
};
|
||||
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
|
||||
|
||||
Some(vec![
|
||||
// This will reassign all dependent objects to the db owner
|
||||
Operation {
|
||||
query: format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
quoted, new_owner,
|
||||
),
|
||||
comment: None,
|
||||
},
|
||||
// Revoke some potentially blocking privileges (Neon-specific currently)
|
||||
Operation {
|
||||
query: format!(
|
||||
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
|
||||
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
role_name = escaped_role,
|
||||
outer_tag = outer_tag,
|
||||
),
|
||||
comment: None,
|
||||
},
|
||||
// This now will only drop privileges of the role
|
||||
// TODO: this is obviously not 100% true because of the above case,
|
||||
// there could be still some privileges that are not revoked. Maybe this
|
||||
// only drops privileges that were granted *by this* role, not *to this* role,
|
||||
// but this has to be checked.
|
||||
Operation {
|
||||
query: format!("DROP OWNED BY {}", quoted),
|
||||
comment: None,
|
||||
},
|
||||
])
|
||||
})
|
||||
.flatten();
|
||||
Some(vec![
|
||||
// This will reassign all dependent objects to the db owner
|
||||
Operation {
|
||||
query: format!("REASSIGN OWNED BY {quoted} TO {new_owner}",),
|
||||
comment: None,
|
||||
},
|
||||
// Revoke some potentially blocking privileges (Neon-specific currently)
|
||||
Operation {
|
||||
query: format!(
|
||||
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
|
||||
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
role_name = escaped_role,
|
||||
outer_tag = outer_tag,
|
||||
),
|
||||
comment: None,
|
||||
},
|
||||
// This now will only drop privileges of the role
|
||||
// TODO: this is obviously not 100% true because of the above case,
|
||||
// there could be still some privileges that are not revoked. Maybe this
|
||||
// only drops privileges that were granted *by this* role, not *to this* role,
|
||||
// but this has to be checked.
|
||||
Operation {
|
||||
query: format!("DROP OWNED BY {quoted}"),
|
||||
comment: None,
|
||||
},
|
||||
])
|
||||
})
|
||||
.flatten();
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ pub async fn ping_safekeeper(
|
||||
let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -919,7 +919,7 @@ fn print_timeline(
|
||||
br_sym = "┗━";
|
||||
}
|
||||
|
||||
print!("{} @{}: ", br_sym, ancestor_lsn);
|
||||
print!("{br_sym} @{ancestor_lsn}: ");
|
||||
}
|
||||
|
||||
// Finally print a timeline id and name with new line
|
||||
@@ -1742,7 +1742,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
|
||||
StopMode::Immediate => true,
|
||||
};
|
||||
if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
eprintln!("pageserver stop failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -1751,7 +1751,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
|
||||
let pageserver = get_pageserver(env, args.pageserver_id)?;
|
||||
//TODO what shutdown strategy should we use here?
|
||||
if let Err(e) = pageserver.stop(false) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
eprintln!("pageserver stop failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
@@ -1768,7 +1768,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
|
||||
{
|
||||
Ok(_) => println!("Page server is up and running"),
|
||||
Err(err) => {
|
||||
eprintln!("Page server is not available: {}", err);
|
||||
eprintln!("Page server is not available: {err}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -1805,7 +1805,7 @@ async fn handle_storage_controller(
|
||||
},
|
||||
};
|
||||
if let Err(e) = svc.stop(stop_args).await {
|
||||
eprintln!("stop failed: {}", e);
|
||||
eprintln!("stop failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -1827,7 +1827,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
|
||||
let safekeeper = get_safekeeper(env, args.id)?;
|
||||
|
||||
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
|
||||
eprintln!("safekeeper start failed: {}", e);
|
||||
eprintln!("safekeeper start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -1839,7 +1839,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
|
||||
StopMode::Immediate => true,
|
||||
};
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper stop failed: {}", e);
|
||||
eprintln!("safekeeper stop failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -1852,12 +1852,12 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
|
||||
};
|
||||
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper stop failed: {}", e);
|
||||
eprintln!("safekeeper stop failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
|
||||
eprintln!("safekeeper start failed: {}", e);
|
||||
eprintln!("safekeeper start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -2113,7 +2113,7 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
|
||||
let storage = EndpointStorage::from_env(env);
|
||||
if let Err(e) = storage.stop(immediate) {
|
||||
eprintln!("endpoint_storage stop failed: {:#}", e);
|
||||
eprintln!("endpoint_storage stop failed: {e:#}");
|
||||
}
|
||||
|
||||
for ps_conf in &env.pageservers {
|
||||
|
||||
@@ -847,10 +847,10 @@ impl Endpoint {
|
||||
|
||||
// Launch compute_ctl
|
||||
let conn_str = self.connstr("cloud_admin", "postgres");
|
||||
println!("Starting postgres node at '{}'", conn_str);
|
||||
println!("Starting postgres node at '{conn_str}'");
|
||||
if create_test_user {
|
||||
let conn_str = self.connstr("test", "neondb");
|
||||
println!("Also at '{}'", conn_str);
|
||||
println!("Also at '{conn_str}'");
|
||||
}
|
||||
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
|
||||
cmd.args([
|
||||
@@ -949,8 +949,7 @@ impl Endpoint {
|
||||
Err(e) => {
|
||||
if Instant::now().duration_since(start_at) > start_timeout {
|
||||
return Err(e).context(format!(
|
||||
"timed out {:?} waiting to connect to compute_ctl HTTP",
|
||||
start_timeout,
|
||||
"timed out {start_timeout:?} waiting to connect to compute_ctl HTTP",
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -989,7 +988,7 @@ impl Endpoint {
|
||||
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
|
||||
let url = response.url().to_owned();
|
||||
let msg = match response.text().await {
|
||||
Ok(err_body) => format!("Error: {}", err_body),
|
||||
Ok(err_body) => format!("Error: {err_body}"),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
};
|
||||
Err(anyhow::anyhow!(msg))
|
||||
@@ -1055,7 +1054,7 @@ impl Endpoint {
|
||||
} else {
|
||||
let url = response.url().to_owned();
|
||||
let msg = match response.text().await {
|
||||
Ok(err_body) => format!("Error: {}", err_body),
|
||||
Ok(err_body) => format!("Error: {err_body}"),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
};
|
||||
Err(anyhow::anyhow!(msg))
|
||||
|
||||
@@ -212,6 +212,8 @@ pub struct NeonStorageControllerConf {
|
||||
pub use_local_compute_notifications: bool,
|
||||
|
||||
pub timeline_safekeeper_count: Option<i64>,
|
||||
|
||||
pub kick_secondary_downloads: Option<bool>,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -243,6 +245,7 @@ impl Default for NeonStorageControllerConf {
|
||||
use_https_safekeeper_api: false,
|
||||
use_local_compute_notifications: true,
|
||||
timeline_safekeeper_count: None,
|
||||
kick_secondary_downloads: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -258,7 +261,7 @@ impl Default for EndpointStorageConf {
|
||||
impl NeonBroker {
|
||||
pub fn client_url(&self) -> Url {
|
||||
let url = if let Some(addr) = self.listen_https_addr {
|
||||
format!("https://{}", addr)
|
||||
format!("https://{addr}")
|
||||
} else {
|
||||
format!(
|
||||
"http://{}",
|
||||
@@ -727,7 +730,7 @@ impl LocalEnv {
|
||||
let config_toml_path = dentry.path().join("pageserver.toml");
|
||||
let config_toml: PageserverConfigTomlSubset = toml_edit::de::from_str(
|
||||
&std::fs::read_to_string(&config_toml_path)
|
||||
.with_context(|| format!("read {:?}", config_toml_path))?,
|
||||
.with_context(|| format!("read {config_toml_path:?}"))?,
|
||||
)
|
||||
.context("parse pageserver.toml")?;
|
||||
let identity_toml_path = dentry.path().join("identity.toml");
|
||||
@@ -737,7 +740,7 @@ impl LocalEnv {
|
||||
}
|
||||
let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
|
||||
&std::fs::read_to_string(&identity_toml_path)
|
||||
.with_context(|| format!("read {:?}", identity_toml_path))?,
|
||||
.with_context(|| format!("read {identity_toml_path:?}"))?,
|
||||
)
|
||||
.context("parse identity.toml")?;
|
||||
let PageserverConfigTomlSubset {
|
||||
|
||||
@@ -122,7 +122,7 @@ impl PageServerNode {
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
|
||||
.unwrap();
|
||||
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
|
||||
overrides.push(format!("control_plane_api_token='{jwt_token}'"));
|
||||
}
|
||||
|
||||
if !conf.other.contains_key("remote_storage") {
|
||||
|
||||
@@ -143,7 +143,7 @@ impl SafekeeperNode {
|
||||
let id_string = id.to_string();
|
||||
// TODO: add availability_zone to the config.
|
||||
// Right now we just specify any value here and use it to check metrics in tests.
|
||||
let availability_zone = format!("sk-{}", id_string);
|
||||
let availability_zone = format!("sk-{id_string}");
|
||||
|
||||
let mut args = vec![
|
||||
"-D".to_owned(),
|
||||
|
||||
@@ -167,7 +167,7 @@ impl StorageController {
|
||||
fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
|
||||
self.env
|
||||
.base_data_dir
|
||||
.join(format!("storage_controller_{}", instance_id))
|
||||
.join(format!("storage_controller_{instance_id}"))
|
||||
}
|
||||
|
||||
fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
|
||||
@@ -226,7 +226,7 @@ impl StorageController {
|
||||
"-d",
|
||||
DB_NAME,
|
||||
"-p",
|
||||
&format!("{}", postgres_port),
|
||||
&format!("{postgres_port}"),
|
||||
];
|
||||
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
|
||||
let envs = [
|
||||
@@ -269,7 +269,7 @@ impl StorageController {
|
||||
"-h",
|
||||
"localhost",
|
||||
"-p",
|
||||
&format!("{}", postgres_port),
|
||||
&format!("{postgres_port}"),
|
||||
"-U",
|
||||
&username(),
|
||||
"-O",
|
||||
@@ -431,7 +431,7 @@ impl StorageController {
|
||||
// from `LocalEnv`'s config file (`.neon/config`).
|
||||
tokio::fs::write(
|
||||
&pg_data_path.join("postgresql.conf"),
|
||||
format!("port = {}\nfsync=off\n", postgres_port),
|
||||
format!("port = {postgres_port}\nfsync=off\n"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -483,7 +483,7 @@ impl StorageController {
|
||||
self.setup_database(postgres_port).await?;
|
||||
}
|
||||
|
||||
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
|
||||
let database_url = format!("postgresql://localhost:{postgres_port}/{DB_NAME}");
|
||||
|
||||
// We support running a startup SQL script to fiddle with the database before we launch storcon.
|
||||
// This is used by the test suite.
|
||||
@@ -514,7 +514,7 @@ impl StorageController {
|
||||
drop(client);
|
||||
conn.await??;
|
||||
|
||||
let addr = format!("{}:{}", host, listen_port);
|
||||
let addr = format!("{host}:{listen_port}");
|
||||
let address_for_peers = Uri::builder()
|
||||
.scheme(scheme)
|
||||
.authority(addr.clone())
|
||||
@@ -563,6 +563,10 @@ impl StorageController {
|
||||
args.push("--use-local-compute-notifications".to_string());
|
||||
}
|
||||
|
||||
if let Some(value) = self.config.kick_secondary_downloads {
|
||||
args.push(format!("--kick-secondary-downloads={value}"));
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
@@ -812,9 +816,9 @@ impl StorageController {
|
||||
builder = builder.json(&body)
|
||||
}
|
||||
if let Some(private_key) = &self.private_key {
|
||||
println!("Getting claims for path {}", path);
|
||||
println!("Getting claims for path {path}");
|
||||
if let Some(required_claims) = Self::get_claims_for_path(&path)? {
|
||||
println!("Got claims {:?} for path {}", required_claims, path);
|
||||
println!("Got claims {required_claims:?} for path {path}");
|
||||
let jwt_token = encode_from_key_file(&required_claims, private_key)?;
|
||||
builder = builder.header(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
|
||||
@@ -649,7 +649,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
response
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.map(|s| format!("{s:?}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
@@ -771,8 +771,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
println!("Tenant {tenant_id}");
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.add_row(["Policy", &format!("{:?}", policy)]);
|
||||
table.add_row(["Stripe size", &format!("{:?}", stripe_size)]);
|
||||
table.add_row(["Policy", &format!("{policy:?}")]);
|
||||
table.add_row(["Stripe size", &format!("{stripe_size:?}")]);
|
||||
table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
|
||||
println!("{table}");
|
||||
println!("Shards:");
|
||||
@@ -789,7 +789,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let secondary = shard
|
||||
.node_secondary
|
||||
.iter()
|
||||
.map(|n| format!("{}", n))
|
||||
.map(|n| format!("{n}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
@@ -863,7 +863,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
} else {
|
||||
// Make it obvious to the user that since they've omitted an AZ, we're clearing it
|
||||
eprintln!("Clearing preferred AZ for tenant {}", tenant_id);
|
||||
eprintln!("Clearing preferred AZ for tenant {tenant_id}");
|
||||
}
|
||||
|
||||
// Construct a request that modifies all the tenant's shards
|
||||
@@ -1134,8 +1134,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
Err((tenant_shard_id, from, to, error)) => {
|
||||
failure += 1;
|
||||
println!(
|
||||
"Failed to migrate {} from node {} to node {}: {}",
|
||||
tenant_shard_id, from, to, error
|
||||
"Failed to migrate {tenant_shard_id} from node {from} to node {to}: {error}"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1277,8 +1276,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
concurrency,
|
||||
} => {
|
||||
let mut path = format!(
|
||||
"/v1/tenant/{}/timeline/{}/download_heatmap_layers",
|
||||
tenant_shard_id, timeline_id,
|
||||
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
|
||||
);
|
||||
|
||||
if let Some(c) = concurrency {
|
||||
@@ -1303,8 +1301,7 @@ async fn watch_tenant_shard(
|
||||
) -> anyhow::Result<()> {
|
||||
if let Some(until_migrated_to) = until_migrated_to {
|
||||
println!(
|
||||
"Waiting for tenant shard {} to be migrated to node {}",
|
||||
tenant_shard_id, until_migrated_to
|
||||
"Waiting for tenant shard {tenant_shard_id} to be migrated to node {until_migrated_to}"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1327,7 +1324,7 @@ async fn watch_tenant_shard(
|
||||
"attached: {} secondary: {} {}",
|
||||
shard
|
||||
.node_attached
|
||||
.map(|n| format!("{}", n))
|
||||
.map(|n| format!("{n}"))
|
||||
.unwrap_or("none".to_string()),
|
||||
shard
|
||||
.node_secondary
|
||||
@@ -1341,15 +1338,12 @@ async fn watch_tenant_shard(
|
||||
"(reconciler idle)"
|
||||
}
|
||||
);
|
||||
println!("{}", summary);
|
||||
println!("{summary}");
|
||||
|
||||
// Maybe drop out if we finished migration
|
||||
if let Some(until_migrated_to) = until_migrated_to {
|
||||
if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling {
|
||||
println!(
|
||||
"Tenant shard {} is now on node {}",
|
||||
tenant_shard_id, until_migrated_to
|
||||
);
|
||||
println!("Tenant shard {tenant_shard_id} is now on node {until_migrated_to}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -374,7 +374,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
let request = Request::builder()
|
||||
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
|
||||
.method(method)
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.header("Authorization", format!("Bearer {token}"))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
let status = ServiceExt::ready(&mut app)
|
||||
|
||||
@@ -71,7 +71,7 @@ impl Runtime {
|
||||
debug!("thread panicked: {:?}", e);
|
||||
let mut result = ctx.result.lock();
|
||||
if result.0 == -1 {
|
||||
*result = (256, format!("thread panicked: {:?}", e));
|
||||
*result = (256, format!("thread panicked: {e:?}"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -47,8 +47,8 @@ impl Debug for AnyMessage {
|
||||
match self {
|
||||
AnyMessage::None => write!(f, "None"),
|
||||
AnyMessage::InternalConnect => write!(f, "InternalConnect"),
|
||||
AnyMessage::Just32(v) => write!(f, "Just32({})", v),
|
||||
AnyMessage::ReplCell(v) => write!(f, "ReplCell({:?})", v),
|
||||
AnyMessage::Just32(v) => write!(f, "Just32({v})"),
|
||||
AnyMessage::ReplCell(v) => write!(f, "ReplCell({v:?})"),
|
||||
AnyMessage::Bytes(v) => write!(f, "Bytes({})", hex::encode(v)),
|
||||
AnyMessage::LSN(v) => write!(f, "LSN({})", Lsn(*v)),
|
||||
}
|
||||
|
||||
@@ -582,14 +582,14 @@ pub fn attach_openapi_ui(
|
||||
deepLinking: true,
|
||||
showExtensions: true,
|
||||
showCommonExtensions: true,
|
||||
url: "{}",
|
||||
url: "{spec_mount_path}",
|
||||
}})
|
||||
window.ui = ui;
|
||||
}};
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"#, spec_mount_path))).unwrap())
|
||||
"#))).unwrap())
|
||||
})
|
||||
)
|
||||
}
|
||||
@@ -696,7 +696,7 @@ mod tests {
|
||||
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
|
||||
let mut service = builder.build(remote_addr);
|
||||
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
|
||||
panic!("request service is not ready: {:?}", e);
|
||||
panic!("request service is not ready: {e:?}");
|
||||
}
|
||||
|
||||
let mut req: Request<Body> = Request::default();
|
||||
@@ -716,7 +716,7 @@ mod tests {
|
||||
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
|
||||
let mut service = builder.build(remote_addr);
|
||||
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
|
||||
panic!("request service is not ready: {:?}", e);
|
||||
panic!("request service is not ready: {e:?}");
|
||||
}
|
||||
|
||||
let req: Request<Body> = Request::default();
|
||||
|
||||
@@ -86,7 +86,7 @@ impl ShmemHandle {
|
||||
// somewhat smaller than that, because with anything close to that, you'll run out of
|
||||
// memory anyway.
|
||||
if max_size >= 1 << 48 {
|
||||
panic!("max size {} too large", max_size);
|
||||
panic!("max size {max_size} too large");
|
||||
}
|
||||
if initial_size > max_size {
|
||||
panic!("initial size {initial_size} larger than max size {max_size}");
|
||||
@@ -279,7 +279,7 @@ mod tests {
|
||||
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
|
||||
for i in range {
|
||||
let b = unsafe { *(ptr.add(i)) };
|
||||
assert_eq!(expected, b, "unexpected byte at offset {}", i);
|
||||
assert_eq!(expected, b, "unexpected byte at offset {i}");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,6 +76,10 @@ pub struct PostHogConfig {
|
||||
pub private_api_url: String,
|
||||
/// Public API URL
|
||||
pub public_api_url: String,
|
||||
/// Refresh interval for the feature flag spec
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub refresh_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
/// `pageserver.toml`
|
||||
|
||||
@@ -577,8 +577,7 @@ mod test {
|
||||
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
"expect unknown field `unknown_field` error, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -334,8 +334,7 @@ impl KeySpace {
|
||||
std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
|
||||
assert!(
|
||||
!overlap,
|
||||
"Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
|
||||
prev, range
|
||||
"Attempt to merge ovelapping keyspaces: {prev:?} overlaps {range:?}"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1104,7 +1103,7 @@ mod tests {
|
||||
// total range contains at least one shard-local page
|
||||
let all_nonzero = fragments.iter().all(|f| f.0 > 0);
|
||||
if !all_nonzero {
|
||||
eprintln!("Found a zero-length fragment: {:?}", fragments);
|
||||
eprintln!("Found a zero-length fragment: {fragments:?}");
|
||||
}
|
||||
assert!(all_nonzero);
|
||||
} else {
|
||||
|
||||
@@ -1183,7 +1183,7 @@ impl Display for ImageCompressionAlgorithm {
|
||||
ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
|
||||
ImageCompressionAlgorithm::Zstd { level } => {
|
||||
if let Some(level) = level {
|
||||
write!(f, "zstd({})", level)
|
||||
write!(f, "zstd({level})")
|
||||
} else {
|
||||
write!(f, "zstd")
|
||||
}
|
||||
@@ -2012,8 +2012,7 @@ mod tests {
|
||||
let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
"expect unknown field `unknown_field` error, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -939,7 +939,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackendReader<IO> {
|
||||
FeMessage::CopyFail => Err(CopyStreamHandlerEnd::CopyFail),
|
||||
FeMessage::Terminate => Err(CopyStreamHandlerEnd::Terminate),
|
||||
_ => Err(CopyStreamHandlerEnd::from(ConnectionError::Protocol(
|
||||
ProtocolError::Protocol(format!("unexpected message in COPY stream {:?}", msg)),
|
||||
ProtocolError::Protocol(format!("unexpected message in COPY stream {msg:?}")),
|
||||
))),
|
||||
},
|
||||
None => Err(CopyStreamHandlerEnd::EOF),
|
||||
|
||||
@@ -61,7 +61,7 @@ async fn simple_select() {
|
||||
// so spawn it off to run on its own.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
@@ -137,7 +137,7 @@ async fn simple_select_ssl() {
|
||||
// so spawn it off to run on its own.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -223,7 +223,7 @@ mod tests_pg_connection_config {
|
||||
assert_eq!(cfg.port(), 123);
|
||||
assert_eq!(cfg.raw_address(), "stub.host.example:123");
|
||||
assert_eq!(
|
||||
format!("{:?}", cfg),
|
||||
format!("{cfg:?}"),
|
||||
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: None }"
|
||||
);
|
||||
}
|
||||
@@ -239,7 +239,7 @@ mod tests_pg_connection_config {
|
||||
assert_eq!(cfg.port(), 123);
|
||||
assert_eq!(cfg.raw_address(), "[::1]:123");
|
||||
assert_eq!(
|
||||
format!("{:?}", cfg),
|
||||
format!("{cfg:?}"),
|
||||
"PgConnectionConfig { host: Ipv6(::1), port: 123, password: None }"
|
||||
);
|
||||
}
|
||||
@@ -252,7 +252,7 @@ mod tests_pg_connection_config {
|
||||
assert_eq!(cfg.port(), 123);
|
||||
assert_eq!(cfg.raw_address(), "stub.host.example:123");
|
||||
assert_eq!(
|
||||
format!("{:?}", cfg),
|
||||
format!("{cfg:?}"),
|
||||
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: Some(REDACTED-STRING) }"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
|
||||
|
||||
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
|
||||
|e| WalDecodeError {
|
||||
msg: format!("long header deserialization failed {}", e),
|
||||
msg: format!("long header deserialization failed {e}"),
|
||||
lsn: self.lsn,
|
||||
},
|
||||
)?;
|
||||
@@ -130,7 +130,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
|
||||
let hdr =
|
||||
XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
|
||||
WalDecodeError {
|
||||
msg: format!("header deserialization failed {}", e),
|
||||
msg: format!("header deserialization failed {e}"),
|
||||
lsn: self.lsn,
|
||||
}
|
||||
})?;
|
||||
@@ -155,7 +155,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
|
||||
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
|
||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
return Err(WalDecodeError {
|
||||
msg: format!("invalid xl_tot_len {}", xl_tot_len),
|
||||
msg: format!("invalid xl_tot_len {xl_tot_len}"),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
@@ -218,7 +218,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
|
||||
let xlogrec =
|
||||
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
|
||||
WalDecodeError {
|
||||
msg: format!("xlog record deserialization failed {}", e),
|
||||
msg: format!("xlog record deserialization failed {e}"),
|
||||
lsn: self.lsn,
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -1196,7 +1196,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
|
||||
pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
|
||||
pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
|
||||
_ => {
|
||||
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
|
||||
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
|
||||
&unknown_str
|
||||
}
|
||||
}
|
||||
@@ -1209,7 +1209,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
|
||||
pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
|
||||
pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
|
||||
_ => {
|
||||
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
|
||||
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
|
||||
&unknown_str
|
||||
}
|
||||
}
|
||||
@@ -1220,7 +1220,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
|
||||
pg_constants::XLOG_FPI => "XLOG FPI",
|
||||
pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
|
||||
_ => {
|
||||
unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
|
||||
unknown_str = format!("XLOG UNKNOWN_0x{info:02x}");
|
||||
&unknown_str
|
||||
}
|
||||
}
|
||||
@@ -1228,7 +1228,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
|
||||
rmid => {
|
||||
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
|
||||
unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
|
||||
unknown_str = format!("UNKNOWN_RM_{rmid} INFO_0x{info:02x}");
|
||||
&unknown_str
|
||||
}
|
||||
};
|
||||
|
||||
@@ -34,7 +34,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
let cfg = Conf {
|
||||
pg_version,
|
||||
pg_distrib_dir: top_path.join("pg_install"),
|
||||
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
|
||||
datadir: top_path.join(format!("test_output/{test_name}-{PG_MAJORVERSION}")),
|
||||
};
|
||||
if cfg.datadir.exists() {
|
||||
fs::remove_dir_all(&cfg.datadir).unwrap();
|
||||
|
||||
@@ -32,15 +32,15 @@ pub enum Error {
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Error::Spawn(e) => write!(f, "Error spawning command: {:?}", e),
|
||||
Error::Spawn(e) => write!(f, "Error spawning command: {e:?}"),
|
||||
Error::Failed { status, stderr } => write!(
|
||||
f,
|
||||
"Command failed with status {:?}: {}",
|
||||
status,
|
||||
String::from_utf8_lossy(stderr)
|
||||
),
|
||||
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {:?}", e),
|
||||
Error::Other(e) => write!(f, "Error: {:?}", e),
|
||||
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {e:?}"),
|
||||
Error::Other(e) => write!(f, "Error: {e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use serde::ser::SerializeTuple;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||
use std::fmt::{Display, Formatter};
|
||||
@@ -79,12 +78,12 @@ impl PgMajorVersion {
|
||||
///
|
||||
/// The PG_VERSION file is used to determine the PostgreSQL version that currently
|
||||
/// owns the data in a PostgreSQL data directory.
|
||||
pub fn versionfile_string(&self) -> String {
|
||||
pub fn versionfile_string(&self) -> &'static str {
|
||||
match self {
|
||||
PgMajorVersion::PG17 => "17\x0A".to_string(),
|
||||
PgMajorVersion::PG16 => "16\x0A".to_string(),
|
||||
PgMajorVersion::PG15 => "15".to_string(),
|
||||
PgMajorVersion::PG14 => "14".to_string(),
|
||||
PgMajorVersion::PG14 => "14",
|
||||
PgMajorVersion::PG15 => "15",
|
||||
PgMajorVersion::PG16 => "16\x0A",
|
||||
PgMajorVersion::PG17 => "17\x0A",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,15 +93,16 @@ impl PgMajorVersion {
|
||||
/// implementation.
|
||||
pub fn v_str(&self) -> String {
|
||||
match self {
|
||||
PgMajorVersion::PG17 => "v17".to_string(),
|
||||
PgMajorVersion::PG16 => "v16".to_string(),
|
||||
PgMajorVersion::PG15 => "v15".to_string(),
|
||||
PgMajorVersion::PG14 => "v14".to_string(),
|
||||
PgMajorVersion::PG14 => "v14",
|
||||
PgMajorVersion::PG15 => "v15",
|
||||
PgMajorVersion::PG16 => "v16",
|
||||
PgMajorVersion::PG17 => "v17",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
|
||||
/// All currently supported major versions of PostgreSQL.
|
||||
pub const ALL: [PgMajorVersion; 4] = [
|
||||
pub const ALL: &'static [PgMajorVersion] = &[
|
||||
PgMajorVersion::PG14,
|
||||
PgMajorVersion::PG15,
|
||||
PgMajorVersion::PG16,
|
||||
@@ -112,20 +112,12 @@ impl PgMajorVersion {
|
||||
|
||||
impl Display for PgMajorVersion {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
PgMajorVersion::PG14 => {
|
||||
write!(f, "PgMajorVersion::PG14")
|
||||
}
|
||||
PgMajorVersion::PG15 => {
|
||||
write!(f, "PgMajorVersion::PG15")
|
||||
}
|
||||
PgMajorVersion::PG16 => {
|
||||
write!(f, "PgMajorVersion::PG16")
|
||||
}
|
||||
PgMajorVersion::PG17 => {
|
||||
write!(f, "PgMajorVersion::PG17")
|
||||
}
|
||||
}
|
||||
f.write_str(match self {
|
||||
PgMajorVersion::PG14 => "PgMajorVersion::PG14",
|
||||
PgMajorVersion::PG15 => "PgMajorVersion::PG15",
|
||||
PgMajorVersion::PG16 => "PgMajorVersion::PG16",
|
||||
PgMajorVersion::PG17 => "PgMajorVersion::PG17",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,8 +127,7 @@ pub struct InvalidPgVersion(u32);
|
||||
|
||||
impl Display for InvalidPgVersion {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.serialize_tuple_struct("InvalidPgVersion", 1)?
|
||||
.serialize_element(&self.0)
|
||||
write!(f, "InvalidPgVersion({})", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,8 +156,7 @@ pub struct PgMajorVersionParseError(String);
|
||||
|
||||
impl Display for PgMajorVersionParseError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.serialize_tuple_struct("PgMajorVersionParseError", 1)?
|
||||
.serialize_element(&self.0)
|
||||
write!(f, "PgMajorVersionParseError({})", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,12 +164,12 @@ impl FromStr for PgMajorVersion {
|
||||
type Err = PgMajorVersionParseError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"14" => Ok(PgMajorVersion::PG14),
|
||||
"15" => Ok(PgMajorVersion::PG15),
|
||||
"16" => Ok(PgMajorVersion::PG16),
|
||||
"17" => Ok(PgMajorVersion::PG17),
|
||||
_ => Err(PgMajorVersionParseError(s.to_string())),
|
||||
}
|
||||
Ok(match s {
|
||||
"14" => PgMajorVersion::PG14,
|
||||
"15" => PgMajorVersion::PG15,
|
||||
"16" => PgMajorVersion::PG16,
|
||||
"17" => PgMajorVersion::PG17,
|
||||
_ => return Err(PgMajorVersionParseError(s.to_string())),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,10 @@ impl FeatureResolverBackgroundLoop {
|
||||
// Main loop of updating the feature flags.
|
||||
handle.spawn(
|
||||
async move {
|
||||
tracing::info!("Starting PostHog feature resolver");
|
||||
tracing::info!(
|
||||
"Starting PostHog feature resolver with refresh period: {:?}",
|
||||
refresh_period
|
||||
);
|
||||
let mut ticker = tokio::time::interval(refresh_period);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
loop {
|
||||
|
||||
@@ -168,15 +168,13 @@ impl FeatureStore {
|
||||
let PostHogFlagFilterPropertyValue::String(provided) = provided else {
|
||||
// Left should be a string
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"The left side of the condition is not a string: {:?}",
|
||||
provided
|
||||
"The left side of the condition is not a string: {provided:?}"
|
||||
)));
|
||||
};
|
||||
let PostHogFlagFilterPropertyValue::List(requested) = requested else {
|
||||
// Right should be a list of string
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"The right side of the condition is not a list: {:?}",
|
||||
requested
|
||||
"The right side of the condition is not a list: {requested:?}"
|
||||
)));
|
||||
};
|
||||
Ok(requested.contains(provided))
|
||||
@@ -185,14 +183,12 @@ impl FeatureStore {
|
||||
let PostHogFlagFilterPropertyValue::String(requested) = requested else {
|
||||
// Right should be a string
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"The right side of the condition is not a string: {:?}",
|
||||
requested
|
||||
"The right side of the condition is not a string: {requested:?}"
|
||||
)));
|
||||
};
|
||||
let Ok(requested) = requested.parse::<f64>() else {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"Can not parse the right side of the condition as a number: {:?}",
|
||||
requested
|
||||
"Can not parse the right side of the condition as a number: {requested:?}"
|
||||
)));
|
||||
};
|
||||
// Left can either be a number or a string
|
||||
@@ -201,16 +197,14 @@ impl FeatureStore {
|
||||
PostHogFlagFilterPropertyValue::String(provided) => {
|
||||
let Ok(provided) = provided.parse::<f64>() else {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"Can not parse the left side of the condition as a number: {:?}",
|
||||
provided
|
||||
"Can not parse the left side of the condition as a number: {provided:?}"
|
||||
)));
|
||||
};
|
||||
provided
|
||||
}
|
||||
_ => {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"The left side of the condition is not a number or a string: {:?}",
|
||||
provided
|
||||
"The left side of the condition is not a number or a string: {provided:?}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
@@ -218,14 +212,12 @@ impl FeatureStore {
|
||||
"lt" => Ok(provided < requested),
|
||||
"gt" => Ok(provided > requested),
|
||||
op => Err(PostHogEvaluationError::Internal(format!(
|
||||
"Unsupported operator: {}",
|
||||
op
|
||||
"Unsupported operator: {op}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
_ => Err(PostHogEvaluationError::Internal(format!(
|
||||
"Unsupported operator: {}",
|
||||
operator
|
||||
"Unsupported operator: {operator}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
@@ -373,8 +365,7 @@ impl FeatureStore {
|
||||
if let Some(flag_config) = self.flags.get(flag_key) {
|
||||
if !flag_config.active {
|
||||
return Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"The feature flag is not active: {}",
|
||||
flag_key
|
||||
"The feature flag is not active: {flag_key}"
|
||||
)));
|
||||
}
|
||||
let Some(ref multivariate) = flag_config.filters.multivariate else {
|
||||
@@ -401,8 +392,7 @@ impl FeatureStore {
|
||||
// This should not happen because the rollout percentage always adds up to 100, but just in case that PostHog
|
||||
// returned invalid spec, we return an error.
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"Rollout percentage does not add up to 100: {}",
|
||||
flag_key
|
||||
"Rollout percentage does not add up to 100: {flag_key}"
|
||||
)));
|
||||
}
|
||||
GroupEvaluationResult::Unmatched => continue,
|
||||
@@ -413,8 +403,7 @@ impl FeatureStore {
|
||||
} else {
|
||||
// The feature flag is not available yet
|
||||
Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"Not found in the local evaluation spec: {}",
|
||||
flag_key
|
||||
"Not found in the local evaluation spec: {flag_key}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -440,8 +429,7 @@ impl FeatureStore {
|
||||
if let Some(flag_config) = self.flags.get(flag_key) {
|
||||
if !flag_config.active {
|
||||
return Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"The feature flag is not active: {}",
|
||||
flag_key
|
||||
"The feature flag is not active: {flag_key}"
|
||||
)));
|
||||
}
|
||||
if flag_config.filters.multivariate.is_some() {
|
||||
@@ -456,8 +444,7 @@ impl FeatureStore {
|
||||
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
|
||||
GroupEvaluationResult::MatchedAndOverride(_) => {
|
||||
return Err(PostHogEvaluationError::Internal(format!(
|
||||
"Boolean flag cannot have overrides: {}",
|
||||
flag_key
|
||||
"Boolean flag cannot have overrides: {flag_key}"
|
||||
)));
|
||||
}
|
||||
GroupEvaluationResult::MatchedAndEvaluate => {
|
||||
@@ -471,8 +458,7 @@ impl FeatureStore {
|
||||
} else {
|
||||
// The feature flag is not available yet
|
||||
Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"Not found in the local evaluation spec: {}",
|
||||
flag_key
|
||||
"Not found in the local evaluation spec: {flag_key}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -483,8 +469,7 @@ impl FeatureStore {
|
||||
Ok(flag_config.filters.multivariate.is_none())
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"Not found in the local evaluation spec: {}",
|
||||
flag_key
|
||||
"Not found in the local evaluation spec: {flag_key}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,7 +198,7 @@ impl fmt::Display for CancelKeyData {
|
||||
|
||||
// This format is more compact and might work better for logs.
|
||||
f.debug_tuple("CancelKeyData")
|
||||
.field(&format_args!("{:x}", id))
|
||||
.field(&format_args!("{id:x}"))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -291,8 +291,7 @@ impl FeMessage {
|
||||
let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
|
||||
if len < 4 {
|
||||
return Err(ProtocolError::Protocol(format!(
|
||||
"invalid message length {}",
|
||||
len
|
||||
"invalid message length {len}"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -367,8 +366,7 @@ impl FeStartupPacket {
|
||||
#[allow(clippy::manual_range_contains)]
|
||||
if len < 8 || len > MAX_STARTUP_PACKET_LENGTH {
|
||||
return Err(ProtocolError::Protocol(format!(
|
||||
"invalid startup packet message length {}",
|
||||
len
|
||||
"invalid startup packet message length {len}"
|
||||
)));
|
||||
}
|
||||
|
||||
|
||||
@@ -308,7 +308,7 @@ impl ScramSha256 {
|
||||
|
||||
let verifier = match parsed {
|
||||
ServerFinalMessage::Error(e) => {
|
||||
return Err(io::Error::other(format!("SCRAM error: {}", e)));
|
||||
return Err(io::Error::other(format!("SCRAM error: {e}")));
|
||||
}
|
||||
ServerFinalMessage::Verifier(verifier) => verifier,
|
||||
};
|
||||
@@ -343,10 +343,8 @@ impl<'a> Parser<'a> {
|
||||
match self.it.next() {
|
||||
Some((_, c)) if c == target => Ok(()),
|
||||
Some((i, c)) => {
|
||||
let m = format!(
|
||||
"unexpected character at byte {}: expected `{}` but got `{}",
|
||||
i, target, c
|
||||
);
|
||||
let m =
|
||||
format!("unexpected character at byte {i}: expected `{target}` but got `{c}");
|
||||
Err(io::Error::new(io::ErrorKind::InvalidInput, m))
|
||||
}
|
||||
None => Err(io::Error::new(
|
||||
@@ -412,7 +410,7 @@ impl<'a> Parser<'a> {
|
||||
match self.it.peek() {
|
||||
Some(&(i, _)) => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("unexpected trailing data at byte {}", i),
|
||||
format!("unexpected trailing data at byte {i}"),
|
||||
)),
|
||||
None => Ok(()),
|
||||
}
|
||||
|
||||
@@ -211,7 +211,7 @@ impl Message {
|
||||
tag => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("unknown authentication tag `{}`", tag),
|
||||
format!("unknown authentication tag `{tag}`"),
|
||||
));
|
||||
}
|
||||
},
|
||||
@@ -238,7 +238,7 @@ impl Message {
|
||||
tag => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("unknown message tag `{}`", tag),
|
||||
format!("unknown message tag `{tag}`"),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -46,7 +46,7 @@ impl fmt::Display for Type {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self.schema() {
|
||||
"public" | "pg_catalog" => {}
|
||||
schema => write!(fmt, "{}.", schema)?,
|
||||
schema => write!(fmt, "{schema}.")?,
|
||||
}
|
||||
fmt.write_str(self.name())
|
||||
}
|
||||
|
||||
@@ -12,7 +12,9 @@ use tokio::net::TcpStream;
|
||||
|
||||
use crate::connect::connect;
|
||||
use crate::connect_raw::{RawConnection, connect_raw};
|
||||
use crate::tls::{MakeTlsConnect, TlsConnect};
|
||||
use crate::connect_tls::connect_tls;
|
||||
use crate::maybe_tls_stream::MaybeTlsStream;
|
||||
use crate::tls::{MakeTlsConnect, TlsConnect, TlsStream};
|
||||
use crate::{Client, Connection, Error};
|
||||
|
||||
/// TLS configuration.
|
||||
@@ -238,7 +240,7 @@ impl Config {
|
||||
connect(tls, self).await
|
||||
}
|
||||
|
||||
pub async fn connect_raw<S, T>(
|
||||
pub async fn tls_and_authenticate<S, T>(
|
||||
&self,
|
||||
stream: S,
|
||||
tls: T,
|
||||
@@ -247,7 +249,19 @@ impl Config {
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
T: TlsConnect<S>,
|
||||
{
|
||||
connect_raw(stream, tls, self).await
|
||||
let stream = connect_tls(stream, self.ssl_mode, tls).await?;
|
||||
connect_raw(stream, self).await
|
||||
}
|
||||
|
||||
pub async fn authenticate<S, T>(
|
||||
&self,
|
||||
stream: MaybeTlsStream<S, T>,
|
||||
) -> Result<RawConnection<S, T>, Error>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
T: TlsStream + Unpin,
|
||||
{
|
||||
connect_raw(stream, self).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
use std::net::IpAddr;
|
||||
|
||||
use postgres_protocol2::message::backend::Message;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::client::SocketConfig;
|
||||
use crate::codec::BackendMessage;
|
||||
use crate::config::Host;
|
||||
use crate::config::{Host, SslMode};
|
||||
use crate::connect_raw::connect_raw;
|
||||
use crate::connect_socket::connect_socket;
|
||||
use crate::connect_tls::connect_tls;
|
||||
use crate::tls::{MakeTlsConnect, TlsConnect};
|
||||
use crate::{Client, Config, Connection, Error, RawConnection};
|
||||
|
||||
@@ -44,13 +46,8 @@ where
|
||||
T: TlsConnect<TcpStream>,
|
||||
{
|
||||
let socket = connect_socket(host_addr, host, port, config.connect_timeout).await?;
|
||||
let RawConnection {
|
||||
stream,
|
||||
parameters,
|
||||
delayed_notice,
|
||||
process_id,
|
||||
secret_key,
|
||||
} = connect_raw(socket, tls, config).await?;
|
||||
let stream = connect_tls(socket, config.ssl_mode, tls).await?;
|
||||
let raw = connect_raw(stream, config).await?;
|
||||
|
||||
let socket_config = SocketConfig {
|
||||
host_addr,
|
||||
@@ -59,24 +56,46 @@ where
|
||||
connect_timeout: config.connect_timeout,
|
||||
};
|
||||
|
||||
let (client_tx, conn_rx) = mpsc::unbounded_channel();
|
||||
let (conn_tx, client_rx) = mpsc::channel(4);
|
||||
let client = Client::new(
|
||||
client_tx,
|
||||
client_rx,
|
||||
socket_config,
|
||||
config.ssl_mode,
|
||||
process_id,
|
||||
secret_key,
|
||||
);
|
||||
|
||||
// delayed notices are always sent as "Async" messages.
|
||||
let delayed = delayed_notice
|
||||
.into_iter()
|
||||
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
|
||||
.collect();
|
||||
|
||||
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
|
||||
|
||||
Ok((client, connection))
|
||||
Ok(raw.into_managed_conn(socket_config, config.ssl_mode))
|
||||
}
|
||||
|
||||
impl<S, T> RawConnection<S, T>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
pub fn into_managed_conn(
|
||||
self,
|
||||
socket_config: SocketConfig,
|
||||
ssl_mode: SslMode,
|
||||
) -> (Client, Connection<S, T>) {
|
||||
let RawConnection {
|
||||
stream,
|
||||
parameters,
|
||||
delayed_notice,
|
||||
process_id,
|
||||
secret_key,
|
||||
} = self;
|
||||
|
||||
let (client_tx, conn_rx) = mpsc::unbounded_channel();
|
||||
let (conn_tx, client_rx) = mpsc::channel(4);
|
||||
let client = Client::new(
|
||||
client_tx,
|
||||
client_rx,
|
||||
socket_config,
|
||||
ssl_mode,
|
||||
process_id,
|
||||
secret_key,
|
||||
);
|
||||
|
||||
// delayed notices are always sent as "Async" messages.
|
||||
let delayed = delayed_notice
|
||||
.into_iter()
|
||||
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
|
||||
.collect();
|
||||
|
||||
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
|
||||
|
||||
(client, connection)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,9 +16,8 @@ use tokio_util::codec::Framed;
|
||||
use crate::Error;
|
||||
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
|
||||
use crate::config::{self, AuthKeys, Config};
|
||||
use crate::connect_tls::connect_tls;
|
||||
use crate::maybe_tls_stream::MaybeTlsStream;
|
||||
use crate::tls::{TlsConnect, TlsStream};
|
||||
use crate::tls::TlsStream;
|
||||
|
||||
pub struct StartupStream<S, T> {
|
||||
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
|
||||
@@ -87,16 +86,13 @@ pub struct RawConnection<S, T> {
|
||||
}
|
||||
|
||||
pub async fn connect_raw<S, T>(
|
||||
stream: S,
|
||||
tls: T,
|
||||
stream: MaybeTlsStream<S, T>,
|
||||
config: &Config,
|
||||
) -> Result<RawConnection<S, T::Stream>, Error>
|
||||
) -> Result<RawConnection<S, T>, Error>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
T: TlsConnect<S>,
|
||||
T: TlsStream + Unpin,
|
||||
{
|
||||
let stream = connect_tls(stream, config.ssl_mode, tls).await?;
|
||||
|
||||
let mut stream = StartupStream {
|
||||
inner: Framed::new(stream, PostgresCodec),
|
||||
buf: BackendMessages::empty(),
|
||||
|
||||
@@ -332,10 +332,10 @@ impl fmt::Display for DbError {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(fmt, "{}: {}", self.severity, self.message)?;
|
||||
if let Some(detail) = &self.detail {
|
||||
write!(fmt, "\nDETAIL: {}", detail)?;
|
||||
write!(fmt, "\nDETAIL: {detail}")?;
|
||||
}
|
||||
if let Some(hint) = &self.hint {
|
||||
write!(fmt, "\nHINT: {}", hint)?;
|
||||
write!(fmt, "\nHINT: {hint}")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -398,9 +398,9 @@ impl fmt::Display for Error {
|
||||
Kind::Io => fmt.write_str("error communicating with the server")?,
|
||||
Kind::UnexpectedMessage => fmt.write_str("unexpected message from server")?,
|
||||
Kind::Tls => fmt.write_str("error performing TLS handshake")?,
|
||||
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {}", idx)?,
|
||||
Kind::FromSql(idx) => write!(fmt, "error deserializing column {}", idx)?,
|
||||
Kind::Column(column) => write!(fmt, "invalid column `{}`", column)?,
|
||||
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {idx}")?,
|
||||
Kind::FromSql(idx) => write!(fmt, "error deserializing column {idx}")?,
|
||||
Kind::Column(column) => write!(fmt, "invalid column `{column}`")?,
|
||||
Kind::Closed => fmt.write_str("connection closed")?,
|
||||
Kind::Db => fmt.write_str("db error")?,
|
||||
Kind::Parse => fmt.write_str("error parsing response from server")?,
|
||||
@@ -411,7 +411,7 @@ impl fmt::Display for Error {
|
||||
Kind::Timeout => fmt.write_str("timeout waiting for server")?,
|
||||
};
|
||||
if let Some(ref cause) = self.0.cause {
|
||||
write!(fmt, ": {}", cause)?;
|
||||
write!(fmt, ": {cause}")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ impl Row {
|
||||
{
|
||||
match self.get_inner(&idx) {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => panic!("error retrieving column {}: {}", idx, err),
|
||||
Err(err) => panic!("error retrieving column {idx}: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,7 +274,7 @@ impl SimpleQueryRow {
|
||||
{
|
||||
match self.get_inner(&idx) {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => panic!("error retrieving column {}: {}", idx, err),
|
||||
Err(err) => panic!("error retrieving column {idx}: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -400,7 +400,7 @@ impl RemoteStorage for LocalFs {
|
||||
key
|
||||
};
|
||||
|
||||
let relative_key = format!("{}", relative_key);
|
||||
let relative_key = format!("{relative_key}");
|
||||
if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
let first_part = relative_key
|
||||
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||
@@ -594,13 +594,9 @@ impl RemoteStorage for LocalFs {
|
||||
let from_path = from.with_base(&self.storage_root);
|
||||
let to_path = to.with_base(&self.storage_root);
|
||||
create_target_directory(&to_path).await?;
|
||||
fs::copy(&from_path, &to_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to copy file from '{from_path}' to '{to_path}'",
|
||||
from_path = from_path,
|
||||
to_path = to_path
|
||||
)
|
||||
})?;
|
||||
fs::copy(&from_path, &to_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to copy file from '{from_path}' to '{to_path}'"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1183,7 +1179,7 @@ mod fs_tests {
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(path)?;
|
||||
write!(file_for_writing, "{}", contents)?;
|
||||
write!(file_for_writing, "{contents}")?;
|
||||
drop(file_for_writing);
|
||||
let file_size = path.metadata()?.len() as usize;
|
||||
Ok((
|
||||
|
||||
@@ -193,10 +193,10 @@ mod tests {
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
println!("members: {}", members);
|
||||
println!("members: {members}");
|
||||
|
||||
let j = serde_json::to_string(&members).expect("failed to serialize");
|
||||
println!("members json: {}", j);
|
||||
println!("members json: {j}");
|
||||
assert_eq!(
|
||||
j,
|
||||
r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#
|
||||
|
||||
@@ -41,7 +41,7 @@ pub fn report_compact_sources<E: std::error::Error>(e: &E) -> impl std::fmt::Dis
|
||||
// why is E a generic parameter here? hope that rustc will see through a default
|
||||
// Error::source implementation and leave the following out if there cannot be any
|
||||
// sources:
|
||||
Sources(self.0.source()).try_for_each(|src| write!(f, ": {}", src))
|
||||
Sources(self.0.source()).try_for_each(|src| write!(f, ": {src}"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -135,7 +135,7 @@ impl Debug for Generation {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Valid(v) => {
|
||||
write!(f, "{:08x}", v)
|
||||
write!(f, "{v:08x}")
|
||||
}
|
||||
Self::None => {
|
||||
write!(f, "<none>")
|
||||
|
||||
@@ -280,7 +280,7 @@ impl TryFrom<Option<&str>> for TimelineId {
|
||||
value
|
||||
.unwrap_or_default()
|
||||
.parse::<TimelineId>()
|
||||
.with_context(|| format!("Could not parse timeline id from {:?}", value))
|
||||
.with_context(|| format!("Could not parse timeline id from {value:?}"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ pub fn wal_stream_connection_config(
|
||||
.set_password(args.auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = args.availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
connstr = connstr.extend_options([format!("availability_zone={availability_zone}")]);
|
||||
}
|
||||
|
||||
Ok(connstr)
|
||||
|
||||
@@ -196,7 +196,7 @@ impl std::fmt::Display for TenantShardId {
|
||||
impl std::fmt::Debug for TenantShardId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// Debug is the same as Display: the compact hex representation
|
||||
write!(f, "{}", self)
|
||||
write!(f, "{self}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -284,7 +284,7 @@ impl std::fmt::Display for ShardIndex {
|
||||
impl std::fmt::Debug for ShardIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// Debug is the same as Display: the compact hex representation
|
||||
write!(f, "{}", self)
|
||||
write!(f, "{self}")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ impl ShutdownSignals {
|
||||
SIGINT => Signal::Interrupt,
|
||||
SIGTERM => Signal::Terminate,
|
||||
SIGQUIT => Signal::Quit,
|
||||
other => panic!("unknown signal: {}", other),
|
||||
other => panic!("unknown signal: {other}"),
|
||||
};
|
||||
|
||||
handler(signal)?;
|
||||
|
||||
@@ -90,8 +90,7 @@ impl Dispatcher {
|
||||
Err(e) => {
|
||||
sink.send(Message::Text(Utf8Bytes::from(
|
||||
serde_json::to_string(&ProtocolResponse::Error(format!(
|
||||
"Received protocol version range {} which does not overlap with {}",
|
||||
agent_range, monitor_range
|
||||
"Received protocol version range {agent_range} which does not overlap with {monitor_range}"
|
||||
)))
|
||||
.unwrap(),
|
||||
)))
|
||||
|
||||
@@ -285,7 +285,7 @@ impl FileCacheState {
|
||||
// why we're constructing the query here.
|
||||
self.client
|
||||
.query(
|
||||
&format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
|
||||
&format!("ALTER SYSTEM SET neon.file_cache_size_limit = {num_mb};"),
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -64,7 +64,7 @@ async fn download_bench_data(
|
||||
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into()?;
|
||||
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent)?;
|
||||
|
||||
eprintln!("Downloading benchmark data to {:?}", temp_dir);
|
||||
eprintln!("Downloading benchmark data to {temp_dir:?}");
|
||||
|
||||
let listing = client
|
||||
.list(None, ListingMode::NoDelimiter, None, cancel)
|
||||
@@ -120,7 +120,7 @@ struct BenchmarkMetadata {
|
||||
}
|
||||
|
||||
async fn load_bench_data(path: &Utf8Path, input_size: usize) -> anyhow::Result<BenchmarkData> {
|
||||
eprintln!("Loading benchmark data from {:?}", path);
|
||||
eprintln!("Loading benchmark data from {path:?}");
|
||||
|
||||
let mut entries = tokio::fs::read_dir(path).await?;
|
||||
let mut ordered_segment_paths = Vec::new();
|
||||
|
||||
@@ -6,6 +6,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// the build then. Anyway, per cargo docs build script shouldn't output to
|
||||
// anywhere but $OUT_DIR.
|
||||
tonic_build::compile_protos("proto/interpreted_wal.proto")
|
||||
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
|
||||
.unwrap_or_else(|e| panic!("failed to compile protos {e:?}"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -128,6 +128,6 @@ pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeErr
|
||||
will_init,
|
||||
describe_postgres_wal_record(rec)?
|
||||
)),
|
||||
_ => Ok(format!("{:?}", rec)),
|
||||
_ => Ok(format!("{rec:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,7 +376,7 @@ impl Level {
|
||||
FATAL => Level::Fatal,
|
||||
PANIC => Level::Panic,
|
||||
WPEVENT => Level::WPEvent,
|
||||
_ => panic!("unknown log level {}", elevel),
|
||||
_ => panic!("unknown log level {elevel}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -446,7 +446,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
|
||||
impl std::fmt::Display for Level {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
write!(f, "{self:?}")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -380,7 +380,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
|
||||
println!("conn_send_query: {}", query);
|
||||
println!("conn_send_query: {query}");
|
||||
true
|
||||
}
|
||||
|
||||
@@ -399,13 +399,13 @@ mod tests {
|
||||
) -> crate::bindings::PGAsyncReadResult {
|
||||
println!("conn_async_read");
|
||||
let reply = self.next_safekeeper_reply();
|
||||
println!("conn_async_read result: {:?}", reply);
|
||||
println!("conn_async_read result: {reply:?}");
|
||||
vec.extend_from_slice(reply);
|
||||
crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
|
||||
}
|
||||
|
||||
fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
|
||||
println!("conn_blocking_write: {:?}", buf);
|
||||
println!("conn_blocking_write: {buf:?}");
|
||||
self.check_walproposer_msg(buf);
|
||||
true
|
||||
}
|
||||
@@ -456,10 +456,7 @@ mod tests {
|
||||
timeout_millis: i64,
|
||||
) -> super::WaitResult {
|
||||
let data = self.wait_events.get();
|
||||
println!(
|
||||
"wait_event_set, timeout_millis={}, res={:?}",
|
||||
timeout_millis, data
|
||||
);
|
||||
println!("wait_event_set, timeout_millis={timeout_millis}, res={data:?}");
|
||||
super::WaitResult::Network(data.sk, data.event_mask)
|
||||
}
|
||||
|
||||
@@ -475,7 +472,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
|
||||
println!("wp_log[{}] {}", level, msg);
|
||||
println!("wp_log[{level}] {msg}");
|
||||
}
|
||||
|
||||
fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
|
||||
|
||||
@@ -12,6 +12,9 @@ testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "
|
||||
|
||||
fuzz-read-path = ["testing"]
|
||||
|
||||
# Enables benchmarking only APIs
|
||||
benchmarking = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
@@ -127,6 +130,7 @@ harness = false
|
||||
[[bench]]
|
||||
name = "bench_ingest"
|
||||
harness = false
|
||||
required-features = ["benchmarking"]
|
||||
|
||||
[[bench]]
|
||||
name = "upload_queue"
|
||||
|
||||
@@ -1,22 +1,29 @@
|
||||
use std::env;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use pageserver::config::PageServerConf;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::keyspace::KeySpace;
|
||||
use pageserver::l0_flush::{L0FlushConfig, L0FlushGlobalState};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::storage_layer::InMemoryLayer;
|
||||
use pageserver::tenant::storage_layer::IoConcurrency;
|
||||
use pageserver::tenant::storage_layer::{InMemoryLayer, ValuesReconstructState};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::Gate;
|
||||
use wal_decoder::models::value::Value;
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
|
||||
@@ -30,7 +37,7 @@ fn murmurhash32(mut h: u32) -> u32 {
|
||||
h
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug)]
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
|
||||
enum KeyLayout {
|
||||
/// Sequential unique keys
|
||||
Sequential,
|
||||
@@ -40,19 +47,30 @@ enum KeyLayout {
|
||||
RandomReuse(u32),
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug)]
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
|
||||
enum WriteDelta {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
|
||||
enum ConcurrentReads {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
async fn ingest(
|
||||
conf: &'static PageServerConf,
|
||||
put_size: usize,
|
||||
put_count: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
concurrent_reads: ConcurrentReads,
|
||||
) -> anyhow::Result<()> {
|
||||
if concurrent_reads == ConcurrentReads::Yes {
|
||||
assert_eq!(key_layout, KeyLayout::Sequential);
|
||||
}
|
||||
|
||||
let mut lsn = utils::lsn::Lsn(1000);
|
||||
let mut key = Key::from_i128(0x0);
|
||||
|
||||
@@ -68,16 +86,18 @@ async fn ingest(
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let layer = InMemoryLayer::create(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn,
|
||||
&gate,
|
||||
&cancel,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let layer = Arc::new(
|
||||
InMemoryLayer::create(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn,
|
||||
&gate,
|
||||
&cancel,
|
||||
&ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
|
||||
let data_ser_size = data.serialized_size().unwrap() as usize;
|
||||
@@ -86,6 +106,61 @@ async fn ingest(
|
||||
pageserver::context::DownloadBehavior::Download,
|
||||
);
|
||||
|
||||
const READ_BATCH_SIZE: u32 = 32;
|
||||
let (tx, mut rx) = tokio::sync::watch::channel::<Option<Key>>(None);
|
||||
let reader_cancel = CancellationToken::new();
|
||||
let reader_handle = if concurrent_reads == ConcurrentReads::Yes {
|
||||
Some(tokio::task::spawn({
|
||||
let cancel = reader_cancel.clone();
|
||||
let layer = layer.clone();
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let gate = Gate::default();
|
||||
let gate_guard = gate.enter().unwrap();
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
GetVectoredConcurrentIo::SidecarTask,
|
||||
gate_guard,
|
||||
);
|
||||
|
||||
rx.wait_for(|key| key.is_some()).await.unwrap();
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
let key = match *rx.borrow() {
|
||||
Some(some) => some,
|
||||
None => unreachable!(),
|
||||
};
|
||||
|
||||
let mut start_key = key;
|
||||
start_key.field6 = key.field6.saturating_sub(READ_BATCH_SIZE);
|
||||
let key_range = start_key..key.next();
|
||||
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
KeySpace::single(key_range),
|
||||
Lsn(1)..Lsn(u64::MAX),
|
||||
&mut reconstruct_state,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut collect_futs = std::mem::take(&mut reconstruct_state.keys)
|
||||
.into_values()
|
||||
.map(|state| state.sink_pending_ios())
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
while collect_futs.next().await.is_some() {}
|
||||
}
|
||||
|
||||
drop(io_concurrency);
|
||||
gate.close().await;
|
||||
}
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
const BATCH_SIZE: usize = 16;
|
||||
let mut batch = Vec::new();
|
||||
|
||||
@@ -113,19 +188,27 @@ async fn ingest(
|
||||
|
||||
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
|
||||
if batch.len() >= BATCH_SIZE {
|
||||
let last_key = Key::from_compact(batch.last().unwrap().0);
|
||||
|
||||
let this_batch = std::mem::take(&mut batch);
|
||||
let serialized = SerializedValueBatch::from_values(this_batch);
|
||||
layer.put_batch(serialized, &ctx).await?;
|
||||
|
||||
tx.send(Some(last_key)).unwrap();
|
||||
}
|
||||
}
|
||||
if !batch.is_empty() {
|
||||
let last_key = Key::from_compact(batch.last().unwrap().0);
|
||||
|
||||
let this_batch = std::mem::take(&mut batch);
|
||||
let serialized = SerializedValueBatch::from_values(this_batch);
|
||||
layer.put_batch(serialized, &ctx).await?;
|
||||
|
||||
tx.send(Some(last_key)).unwrap();
|
||||
}
|
||||
layer.freeze(lsn + 1).await;
|
||||
|
||||
if matches!(write_delta, WriteDelta::Yes) {
|
||||
if write_delta == WriteDelta::Yes {
|
||||
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
|
||||
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
});
|
||||
@@ -136,6 +219,11 @@ async fn ingest(
|
||||
tokio::fs::remove_file(path).await?;
|
||||
}
|
||||
|
||||
reader_cancel.cancel();
|
||||
if let Some(handle) = reader_handle {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -147,6 +235,7 @@ fn ingest_main(
|
||||
put_count: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
concurrent_reads: ConcurrentReads,
|
||||
) {
|
||||
pageserver::virtual_file::set_io_mode(io_mode);
|
||||
|
||||
@@ -156,7 +245,15 @@ fn ingest_main(
|
||||
.unwrap();
|
||||
|
||||
runtime.block_on(async move {
|
||||
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
|
||||
let r = ingest(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
key_layout,
|
||||
write_delta,
|
||||
concurrent_reads,
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = r {
|
||||
panic!("{e:?}");
|
||||
}
|
||||
@@ -195,6 +292,7 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
key_size: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
concurrent_reads: ConcurrentReads,
|
||||
}
|
||||
#[derive(Clone)]
|
||||
struct HandPickedParameters {
|
||||
@@ -245,7 +343,7 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
];
|
||||
let exploded_parameters = {
|
||||
let mut out = Vec::new();
|
||||
for io_mode in IoMode::iter() {
|
||||
for concurrent_reads in [ConcurrentReads::Yes, ConcurrentReads::No] {
|
||||
for param in expect.clone() {
|
||||
let HandPickedParameters {
|
||||
volume_mib,
|
||||
@@ -253,12 +351,18 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
key_layout,
|
||||
write_delta,
|
||||
} = param;
|
||||
|
||||
if key_layout != KeyLayout::Sequential && concurrent_reads == ConcurrentReads::Yes {
|
||||
continue;
|
||||
}
|
||||
|
||||
out.push(ExplodedParameters {
|
||||
io_mode,
|
||||
io_mode: IoMode::DirectRw,
|
||||
volume_mib,
|
||||
key_size,
|
||||
key_layout,
|
||||
write_delta,
|
||||
concurrent_reads,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -272,9 +376,10 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
key_size,
|
||||
key_layout,
|
||||
write_delta,
|
||||
concurrent_reads,
|
||||
} = self;
|
||||
format!(
|
||||
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
|
||||
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?} concurrent_reads={concurrent_reads:?}"
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -287,12 +392,23 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
key_size,
|
||||
key_layout,
|
||||
write_delta,
|
||||
concurrent_reads,
|
||||
} = params;
|
||||
let put_count = volume_mib * 1024 * 1024 / key_size;
|
||||
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
|
||||
group.sample_size(10);
|
||||
group.bench_function(id, |b| {
|
||||
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
io_mode,
|
||||
key_size,
|
||||
put_count,
|
||||
key_layout,
|
||||
write_delta,
|
||||
concurrent_reads,
|
||||
)
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -509,11 +509,11 @@ impl Client {
|
||||
.expect("Cannot build URL");
|
||||
|
||||
path.query_pairs_mut()
|
||||
.append_pair("recurse", &format!("{}", recurse));
|
||||
.append_pair("recurse", &format!("{recurse}"));
|
||||
|
||||
if let Some(concurrency) = concurrency {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("concurrency", &format!("{}", concurrency));
|
||||
.append_pair("concurrency", &format!("{concurrency}"));
|
||||
}
|
||||
|
||||
self.request(Method::POST, path, ()).await.map(|_| ())
|
||||
|
||||
@@ -152,7 +152,7 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
|
||||
let key_diff = key_end - key_start;
|
||||
|
||||
if key_start >= key_end {
|
||||
panic!("Invalid key range {}-{}", key_start, key_end);
|
||||
panic!("Invalid key range {key_start}-{key_end}");
|
||||
}
|
||||
|
||||
let lsn_start = lsn_map.map(f.lsn_range.start);
|
||||
@@ -212,12 +212,12 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
|
||||
)?;
|
||||
writeln!(svg, "</line>")?;
|
||||
}
|
||||
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
|
||||
Ordering::Greater => panic!("Invalid lsn range {lsn_start}-{lsn_end}"),
|
||||
}
|
||||
files_seen.insert(f);
|
||||
}
|
||||
|
||||
writeln!(svg, "{}", EndSvg)?;
|
||||
writeln!(svg, "{EndSvg}")?;
|
||||
|
||||
let mut layer_events_str = String::new();
|
||||
let mut first = true;
|
||||
|
||||
@@ -228,7 +228,7 @@ pub fn main() -> Result<()> {
|
||||
let lsn_max = lsn_map.len();
|
||||
|
||||
if key_start >= key_end {
|
||||
panic!("Invalid key range {}-{}", key_start, key_end);
|
||||
panic!("Invalid key range {key_start}-{key_end}");
|
||||
}
|
||||
|
||||
let lsn_start = *lsn_map.get(&lsnr.start).unwrap();
|
||||
@@ -250,7 +250,7 @@ pub fn main() -> Result<()> {
|
||||
ymargin = 0.05;
|
||||
fill = Fill::Color(rgb(0, 0, 0));
|
||||
}
|
||||
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
|
||||
Ordering::Greater => panic!("Invalid lsn range {lsn_start}-{lsn_end}"),
|
||||
}
|
||||
|
||||
println!(
|
||||
@@ -287,10 +287,10 @@ pub fn main() -> Result<()> {
|
||||
);
|
||||
}
|
||||
|
||||
println!("{}", EndSvg);
|
||||
println!("{EndSvg}");
|
||||
|
||||
eprintln!("num_images: {}", num_images);
|
||||
eprintln!("num_deltas: {}", num_deltas);
|
||||
eprintln!("num_images: {num_images}");
|
||||
eprintln!("num_deltas: {num_deltas}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -372,7 +372,7 @@ impl<const N: usize> std::fmt::Debug for RelTagish<N> {
|
||||
f.write_char('/')?;
|
||||
}
|
||||
first = false;
|
||||
write!(f, "{}", x)
|
||||
write!(f, "{x}")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,8 +224,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
}
|
||||
}
|
||||
println!(
|
||||
"Total delta layers {} image layers {} excess layers {}",
|
||||
total_delta_layers, total_image_layers, total_excess_layers
|
||||
"Total delta layers {total_delta_layers} image layers {total_image_layers} excess layers {total_excess_layers}"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ impl Client {
|
||||
let domain_stream = response_stream.map(|chunk_res| {
|
||||
chunk_res.and_then(|proto_chunk| {
|
||||
proto_chunk.try_into().map_err(|e| {
|
||||
tonic::Status::internal(format!("Failed to convert response chunk: {}", e))
|
||||
tonic::Status::internal(format!("Failed to convert response chunk: {e}"))
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
@@ -62,7 +62,7 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
println!("operating on timeline {}", timeline);
|
||||
println!("operating on timeline {timeline}");
|
||||
|
||||
mgmt_api_client
|
||||
.set_tenant_config(&TenantConfigRequest {
|
||||
@@ -75,8 +75,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let items = (0..100)
|
||||
.map(|id| {
|
||||
(
|
||||
format!("pg_logical/mappings/{:03}.{:03}", batch, id),
|
||||
format!("{:08}", id),
|
||||
format!("pg_logical/mappings/{batch:03}.{id:03}"),
|
||||
format!("{id:08}"),
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
@@ -667,7 +667,7 @@ where
|
||||
}
|
||||
|
||||
// Append dir path for each database
|
||||
let path = format!("base/{}", dbnode);
|
||||
let path = format!("base/{dbnode}");
|
||||
let header = new_tar_header_dir(&path)?;
|
||||
self.ar
|
||||
.append(&header, io::empty())
|
||||
@@ -675,7 +675,7 @@ where
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
|
||||
|
||||
if let Some(img) = relmap_img {
|
||||
let dst_path = format!("base/{}/PG_VERSION", dbnode);
|
||||
let dst_path = format!("base/{dbnode}/PG_VERSION");
|
||||
|
||||
let pg_version_str = self.timeline.pg_version.versionfile_string();
|
||||
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
|
||||
@@ -684,7 +684,7 @@ where
|
||||
.await
|
||||
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
|
||||
|
||||
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
|
||||
let relmap_path = format!("base/{dbnode}/pg_filenode.map");
|
||||
let header = new_tar_header(&relmap_path, img.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &img[..])
|
||||
@@ -709,9 +709,9 @@ where
|
||||
let crc = crc32c::crc32c(&img[..]);
|
||||
buf.put_u32_le(crc);
|
||||
let path = if self.timeline.pg_version < PgMajorVersion::PG17 {
|
||||
format!("pg_twophase/{:>08X}", xid)
|
||||
format!("pg_twophase/{xid:>08X}")
|
||||
} else {
|
||||
format!("pg_twophase/{:>016X}", xid)
|
||||
format!("pg_twophase/{xid:>016X}")
|
||||
};
|
||||
let header = new_tar_header(&path, buf.len() as u64)?;
|
||||
self.ar
|
||||
@@ -763,7 +763,7 @@ where
|
||||
//send wal segment
|
||||
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
|
||||
let wal_file_path = format!("pg_wal/{}", wal_file_name);
|
||||
let wal_file_path = format!("pg_wal/{wal_file_name}");
|
||||
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
|
||||
|
||||
let wal_seg = postgres_ffi::generate_wal_segment(
|
||||
|
||||
@@ -583,7 +583,7 @@ fn start_pageserver(
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
feature_resolver: feature_resolver.clone(),
|
||||
},
|
||||
shutdown_pageserver.clone(),
|
||||
);
|
||||
@@ -715,6 +715,7 @@ fn start_pageserver(
|
||||
disk_usage_eviction_state,
|
||||
deletion_queue.new_client(),
|
||||
secondary_controller,
|
||||
feature_resolver,
|
||||
)
|
||||
.context("Failed to initialize router state")?,
|
||||
);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use pageserver_api::config::NodeMetadata;
|
||||
use posthog_client_lite::{
|
||||
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
|
||||
@@ -12,10 +13,13 @@ use utils::id::TenantId;
|
||||
|
||||
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
|
||||
|
||||
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FeatureResolver {
|
||||
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
|
||||
internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
|
||||
force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
|
||||
}
|
||||
|
||||
impl FeatureResolver {
|
||||
@@ -23,6 +27,7 @@ impl FeatureResolver {
|
||||
Self {
|
||||
inner: None,
|
||||
internal_properties: None,
|
||||
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,18 +144,23 @@ impl FeatureResolver {
|
||||
}
|
||||
tenants
|
||||
};
|
||||
// TODO: make refresh period configurable
|
||||
inner
|
||||
.clone()
|
||||
.spawn(handle, Duration::from_secs(60), fake_tenants);
|
||||
inner.clone().spawn(
|
||||
handle,
|
||||
posthog_config
|
||||
.refresh_interval
|
||||
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
|
||||
fake_tenants,
|
||||
);
|
||||
Ok(FeatureResolver {
|
||||
inner: Some(inner),
|
||||
internal_properties: Some(internal_properties),
|
||||
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
|
||||
})
|
||||
} else {
|
||||
Ok(FeatureResolver {
|
||||
inner: None,
|
||||
internal_properties: None,
|
||||
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -190,6 +200,11 @@ impl FeatureResolver {
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<String, PostHogEvaluationError> {
|
||||
let force_overrides = self.force_overrides_for_testing.load();
|
||||
if let Some(value) = force_overrides.get(flag_key) {
|
||||
return Ok(value.clone());
|
||||
}
|
||||
|
||||
if let Some(inner) = &self.inner {
|
||||
let res = inner.feature_store().evaluate_multivariate(
|
||||
flag_key,
|
||||
@@ -228,6 +243,15 @@ impl FeatureResolver {
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
let force_overrides = self.force_overrides_for_testing.load();
|
||||
if let Some(value) = force_overrides.get(flag_key) {
|
||||
return if value == "true" {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NoConditionGroupMatched)
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(inner) = &self.inner {
|
||||
let res = inner.feature_store().evaluate_boolean(
|
||||
flag_key,
|
||||
@@ -259,8 +283,22 @@ impl FeatureResolver {
|
||||
inner.feature_store().is_feature_flag_boolean(flag_key)
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(
|
||||
"PostHog integration is not enabled".to_string(),
|
||||
"PostHog integration is not enabled, cannot auto-determine the flag type"
|
||||
.to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
|
||||
/// from a single thread so it won't race.
|
||||
pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
|
||||
let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
|
||||
if let Some(value) = value {
|
||||
force_overrides.insert(flag_key.to_string(), value.to_string());
|
||||
} else {
|
||||
force_overrides.remove(flag_key);
|
||||
}
|
||||
self.force_overrides_for_testing
|
||||
.store(Arc::new(force_overrides));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ use crate::config::PageServerConf;
|
||||
use crate::context;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::LocationConf;
|
||||
@@ -108,6 +109,7 @@ pub struct State {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
latest_utilization: tokio::sync::Mutex<Option<(std::time::Instant, bytes::Bytes)>>,
|
||||
feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
impl State {
|
||||
@@ -121,6 +123,7 @@ impl State {
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
feature_resolver: FeatureResolver,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = &[
|
||||
"/v1/status",
|
||||
@@ -141,6 +144,7 @@ impl State {
|
||||
deletion_queue_client,
|
||||
secondary_controller,
|
||||
latest_utilization: Default::default(),
|
||||
feature_resolver,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -284,11 +288,11 @@ impl From<GetActiveTenantError> for ApiError {
|
||||
GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
|
||||
ApiError::ShuttingDown
|
||||
}
|
||||
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
|
||||
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{e}")),
|
||||
GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
|
||||
GetActiveTenantError::NotFound(gte) => gte.into(),
|
||||
GetActiveTenantError::WaitForActiveTimeout { .. } => {
|
||||
ApiError::ResourceUnavailable(format!("{}", e).into())
|
||||
ApiError::ResourceUnavailable(format!("{e}").into())
|
||||
}
|
||||
GetActiveTenantError::SwitchedTenant => {
|
||||
// in our HTTP handlers, this error doesn't happen
|
||||
@@ -1012,7 +1016,7 @@ async fn get_lsn_by_timestamp_handler(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let timestamp_raw = must_get_query_param(&request, "timestamp")?;
|
||||
let timestamp = humantime::parse_rfc3339(×tamp_raw)
|
||||
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
|
||||
.with_context(|| format!("Invalid time: {timestamp_raw:?}"))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
|
||||
|
||||
@@ -1107,7 +1111,7 @@ async fn get_timestamp_of_lsn_handler(
|
||||
json_response(StatusCode::OK, time)
|
||||
}
|
||||
None => Err(ApiError::PreconditionFailed(
|
||||
format!("Timestamp for lsn {} not found", lsn).into(),
|
||||
format!("Timestamp for lsn {lsn} not found").into(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
@@ -2418,7 +2422,7 @@ async fn timeline_offload_handler(
|
||||
}
|
||||
if let (false, reason) = timeline.can_offload() {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Timeline::can_offload() check failed: {}", reason) .into(),
|
||||
format!("Timeline::can_offload() check failed: {reason}") .into(),
|
||||
));
|
||||
}
|
||||
offload_timeline(&tenant, &timeline)
|
||||
@@ -3676,8 +3680,8 @@ async fn tenant_evaluate_feature_flag(
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let flag: String = must_parse_query_param(&request, "flag")?;
|
||||
let as_type: String = must_parse_query_param(&request, "as")?;
|
||||
let flag: String = parse_request_param(&request, "flag_key")?;
|
||||
let as_type: Option<String> = parse_query_param(&request, "as")?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
@@ -3686,11 +3690,11 @@ async fn tenant_evaluate_feature_flag(
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
|
||||
if as_type == "boolean" {
|
||||
if as_type.as_deref() == Some("boolean") {
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
|
||||
let result = result.map(|_| true).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else if as_type == "multivariate" {
|
||||
} else if as_type.as_deref() == Some("multivariate") {
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else {
|
||||
@@ -3710,6 +3714,35 @@ async fn tenant_evaluate_feature_flag(
|
||||
.await
|
||||
}
|
||||
|
||||
async fn force_override_feature_flag_for_testing_put(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let flag: String = parse_request_param(&request, "flag_key")?;
|
||||
let value: String = must_parse_query_param(&request, "value")?;
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.feature_resolver
|
||||
.force_override_for_testing(&flag, Some(&value));
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn force_override_feature_flag_for_testing_delete(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let flag: String = parse_request_param(&request, "flag_key")?;
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.feature_resolver
|
||||
.force_override_for_testing(&flag, None);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Common functionality of all the HTTP API handlers.
|
||||
///
|
||||
/// - Adds a tracing span to each request (by `request_span`)
|
||||
@@ -4086,8 +4119,14 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
|
||||
|r| api_handler(r, activate_post_import_handler),
|
||||
)
|
||||
.get("/v1/tenant/:tenant_shard_id/feature_flag", |r| {
|
||||
.get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| {
|
||||
api_handler(r, tenant_evaluate_feature_flag)
|
||||
})
|
||||
.put("/v1/feature_flag/:flag_key", |r| {
|
||||
testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put)
|
||||
})
|
||||
.delete("/v1/feature_flag/:flag_key", |r| {
|
||||
testing_api_handler("force override feature flag - delete", r, force_override_feature_flag_for_testing_delete)
|
||||
})
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -1727,12 +1727,7 @@ impl Drop for SmgrOpTimer {
|
||||
|
||||
impl SmgrOpFlushInProgress {
|
||||
/// The caller must guarantee that `socket_fd`` outlives this function.
|
||||
pub(crate) async fn measure<Fut, O>(
|
||||
self,
|
||||
started_at: Instant,
|
||||
mut fut: Fut,
|
||||
socket_fd: RawFd,
|
||||
) -> O
|
||||
pub(crate) async fn measure<Fut, O>(self, started_at: Instant, fut: Fut, socket_fd: RawFd) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
@@ -3426,7 +3421,7 @@ impl TimelineMetrics {
|
||||
pub fn dec_frozen_layer(&self, layer: &InMemoryLayer) {
|
||||
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
|
||||
let labels = self.make_frozen_layer_labels(layer);
|
||||
let size = layer.try_len().expect("frozen layer should have no writer");
|
||||
let size = layer.len();
|
||||
TIMELINE_LAYER_COUNT
|
||||
.get_metric_with_label_values(&labels)
|
||||
.unwrap()
|
||||
@@ -3441,7 +3436,7 @@ impl TimelineMetrics {
|
||||
pub fn inc_frozen_layer(&self, layer: &InMemoryLayer) {
|
||||
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
|
||||
let labels = self.make_frozen_layer_labels(layer);
|
||||
let size = layer.try_len().expect("frozen layer should have no writer");
|
||||
let size = layer.len();
|
||||
TIMELINE_LAYER_COUNT
|
||||
.get_metric_with_label_values(&labels)
|
||||
.unwrap()
|
||||
|
||||
@@ -392,16 +392,14 @@ async fn page_service_conn_main(
|
||||
} else {
|
||||
let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
|
||||
Err(io_error).context(format!(
|
||||
"Postgres connection error for tenant_id={:?} client at peer_addr={}",
|
||||
tenant_id, peer_addr
|
||||
"Postgres connection error for tenant_id={tenant_id:?} client at peer_addr={peer_addr}"
|
||||
))
|
||||
}
|
||||
}
|
||||
other => {
|
||||
let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
|
||||
other.context(format!(
|
||||
"Postgres query error for tenant_id={:?} client peer_addr={}",
|
||||
tenant_id, peer_addr
|
||||
"Postgres query error for tenant_id={tenant_id:?} client peer_addr={peer_addr}"
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -2140,8 +2138,7 @@ impl PageServerHandler {
|
||||
if request_lsn < not_modified_since {
|
||||
return Err(PageStreamError::BadRequest(
|
||||
format!(
|
||||
"invalid request with request LSN {} and not_modified_since {}",
|
||||
request_lsn, not_modified_since,
|
||||
"invalid request with request LSN {request_lsn} and not_modified_since {not_modified_since}",
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
@@ -3544,8 +3541,9 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
&self,
|
||||
req: tonic::Request<proto::GetBaseBackupRequest>,
|
||||
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
|
||||
// Send 64 KB chunks to avoid large memory allocations.
|
||||
const CHUNK_SIZE: usize = 64 * 1024;
|
||||
// Send chunks of 256 KB to avoid large memory allocations. pagebench basebackup shows this
|
||||
// to be the sweet spot where throughput is saturated.
|
||||
const CHUNK_SIZE: usize = 256 * 1024;
|
||||
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_timeline(&timeline);
|
||||
|
||||
@@ -1185,7 +1185,7 @@ impl Timeline {
|
||||
}
|
||||
let origin_id = k.field6 as RepOriginId;
|
||||
let origin_lsn = Lsn::des(&v)
|
||||
.with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
|
||||
.with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
|
||||
if origin_lsn != Lsn::INVALID {
|
||||
result.insert(origin_id, origin_lsn);
|
||||
}
|
||||
@@ -2440,8 +2440,7 @@ impl DatadirModification<'_> {
|
||||
if path == p {
|
||||
assert!(
|
||||
modifying_file.is_none(),
|
||||
"duplicated entries found for {}",
|
||||
path
|
||||
"duplicated entries found for {path}"
|
||||
);
|
||||
modifying_file = Some(content);
|
||||
} else {
|
||||
|
||||
@@ -3450,7 +3450,7 @@ impl TenantShard {
|
||||
use pageserver_api::models::ActivatingFrom;
|
||||
match &*current_state {
|
||||
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
|
||||
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
|
||||
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {current_state:?}");
|
||||
}
|
||||
TenantState::Attaching => {
|
||||
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
|
||||
@@ -6617,7 +6617,7 @@ mod tests {
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("foo at {}", lsn))),
|
||||
&Value::Image(test_img(&format!("foo at {lsn}"))),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6627,7 +6627,7 @@ mod tests {
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("foo at {}", lsn))),
|
||||
&Value::Image(test_img(&format!("foo at {lsn}"))),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6641,7 +6641,7 @@ mod tests {
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("foo at {}", lsn))),
|
||||
&Value::Image(test_img(&format!("foo at {lsn}"))),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6651,7 +6651,7 @@ mod tests {
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("foo at {}", lsn))),
|
||||
&Value::Image(test_img(&format!("foo at {lsn}"))),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7150,7 +7150,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7438,7 +7438,7 @@ mod tests {
|
||||
.put(
|
||||
gap_at_key,
|
||||
current_lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", gap_at_key, current_lsn))),
|
||||
&Value::Image(test_img(&format!("{gap_at_key} at {current_lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7477,7 +7477,7 @@ mod tests {
|
||||
.put(
|
||||
current_key,
|
||||
current_lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", current_key, current_lsn))),
|
||||
&Value::Image(test_img(&format!("{current_key} at {current_lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7585,7 +7585,7 @@ mod tests {
|
||||
while key < end_key {
|
||||
current_lsn += 0x10;
|
||||
|
||||
let image_value = format!("{} at {}", child_gap_at_key, current_lsn);
|
||||
let image_value = format!("{child_gap_at_key} at {current_lsn}");
|
||||
|
||||
let mut writer = parent_timeline.writer().await;
|
||||
writer
|
||||
@@ -7628,7 +7628,7 @@ mod tests {
|
||||
.put(
|
||||
key,
|
||||
current_lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", key, current_lsn))),
|
||||
&Value::Image(test_img(&format!("{key} at {current_lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7749,7 +7749,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7770,7 +7770,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7784,7 +7784,7 @@ mod tests {
|
||||
test_key.field6 = blknum as u32;
|
||||
assert_eq!(
|
||||
tline.get(test_key, lsn, &ctx).await?,
|
||||
test_img(&format!("{} at {}", blknum, last_lsn))
|
||||
test_img(&format!("{blknum} at {last_lsn}"))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -7830,7 +7830,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -7859,11 +7859,11 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
println!("updating {} at {}", blknum, lsn);
|
||||
println!("updating {blknum} at {lsn}");
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
updated[blknum] = lsn;
|
||||
@@ -7874,7 +7874,7 @@ mod tests {
|
||||
test_key.field6 = blknum as u32;
|
||||
assert_eq!(
|
||||
tline.get(test_key, lsn, &ctx).await?,
|
||||
test_img(&format!("{} at {}", blknum, last_lsn))
|
||||
test_img(&format!("{blknum} at {last_lsn}"))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -7927,11 +7927,11 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{idx} {blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
println!("updating [{}][{}] at {}", idx, blknum, lsn);
|
||||
println!("updating [{idx}][{blknum}] at {lsn}");
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
updated[idx][blknum] = lsn;
|
||||
@@ -8137,7 +8137,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -8154,7 +8154,7 @@ mod tests {
|
||||
test_key.field6 = (blknum * STEP) as u32;
|
||||
assert_eq!(
|
||||
tline.get(test_key, lsn, &ctx).await?,
|
||||
test_img(&format!("{} at {}", blknum, last_lsn))
|
||||
test_img(&format!("{blknum} at {last_lsn}"))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -8191,7 +8191,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -8444,7 +8444,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -8464,7 +8464,7 @@ mod tests {
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
|
||||
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -9385,12 +9385,7 @@ mod tests {
|
||||
let end_lsn = Lsn(0x100);
|
||||
let image_layers = (0x20..=0x90)
|
||||
.step_by(0x10)
|
||||
.map(|n| {
|
||||
(
|
||||
Lsn(n),
|
||||
vec![(key, test_img(&format!("data key at {:x}", n)))],
|
||||
)
|
||||
})
|
||||
.map(|n| (Lsn(n), vec![(key, test_img(&format!("data key at {n:x}")))]))
|
||||
.collect();
|
||||
|
||||
let timeline = tenant
|
||||
|
||||
@@ -63,8 +63,7 @@ pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
|
||||
&& overlaps_with(&layer.key_range, &other_layer.key_range)
|
||||
{
|
||||
let err = format!(
|
||||
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
|
||||
layer, other_layer
|
||||
"layer violates the layer map LSN split assumption: layer {layer} intersects with layer {other_layer}"
|
||||
);
|
||||
return Some(err);
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use num_traits::Num;
|
||||
@@ -18,6 +18,7 @@ use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache;
|
||||
use crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits;
|
||||
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
|
||||
@@ -30,9 +31,13 @@ pub struct EphemeralFile {
|
||||
_tenant_shard_id: TenantShardId,
|
||||
_timeline_id: TimelineId,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
bytes_written: u64,
|
||||
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
|
||||
buffered_writer: BufferedWriter,
|
||||
|
||||
buffered_writer: tokio::sync::RwLock<BufferedWriter>,
|
||||
|
||||
bytes_written: AtomicU64,
|
||||
|
||||
resource_units: std::sync::Mutex<GlobalResourceUnits>,
|
||||
}
|
||||
|
||||
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
|
||||
@@ -94,9 +99,8 @@ impl EphemeralFile {
|
||||
_tenant_shard_id: tenant_shard_id,
|
||||
_timeline_id: timeline_id,
|
||||
page_cache_file_id,
|
||||
bytes_written: 0,
|
||||
file: file.clone(),
|
||||
buffered_writer: BufferedWriter::new(
|
||||
buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new(
|
||||
file,
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
@@ -104,7 +108,9 @@ impl EphemeralFile {
|
||||
cancel.child_token(),
|
||||
ctx,
|
||||
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
|
||||
),
|
||||
)),
|
||||
bytes_written: AtomicU64::new(0),
|
||||
resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -151,15 +157,17 @@ impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum EphemeralFileWriteError {
|
||||
#[error("{0}")]
|
||||
TooLong(String),
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl EphemeralFile {
|
||||
pub(crate) fn len(&self) -> u64 {
|
||||
self.bytes_written
|
||||
// TODO(vlad): The value returned here is not always correct if
|
||||
// we have more than one concurrent writer. Writes are always
|
||||
// sequenced, but we could grab the buffered writer lock if we wanted
|
||||
// to.
|
||||
self.bytes_written.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
|
||||
@@ -186,7 +194,7 @@ impl EphemeralFile {
|
||||
/// Panics if the write is short because there's no way we can recover from that.
|
||||
/// TODO: make upstack handle this as an error.
|
||||
pub(crate) async fn write_raw(
|
||||
&mut self,
|
||||
&self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, EphemeralFileWriteError> {
|
||||
@@ -198,22 +206,13 @@ impl EphemeralFile {
|
||||
}
|
||||
|
||||
async fn write_raw_controlled(
|
||||
&mut self,
|
||||
&self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
|
||||
let pos = self.bytes_written;
|
||||
let mut writer = self.buffered_writer.write().await;
|
||||
|
||||
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
|
||||
EphemeralFileWriteError::TooLong(format!(
|
||||
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
|
||||
srcbuf_len = srcbuf.len(),
|
||||
))
|
||||
})?;
|
||||
|
||||
// Write the payload
|
||||
let (nwritten, control) = self
|
||||
.buffered_writer
|
||||
let (nwritten, control) = writer
|
||||
.write_buffered_borrowed_controlled(srcbuf, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
@@ -225,43 +224,69 @@ impl EphemeralFile {
|
||||
"buffered writer has no short writes"
|
||||
);
|
||||
|
||||
self.bytes_written = new_bytes_written;
|
||||
// There's no realistic risk of overflow here. We won't have exabytes sized files on disk.
|
||||
let pos = self
|
||||
.bytes_written
|
||||
.fetch_add(srcbuf.len().into_u64(), Ordering::AcqRel);
|
||||
|
||||
let mut resource_units = self.resource_units.lock().unwrap();
|
||||
resource_units.maybe_publish_size(self.bytes_written.load(Ordering::Relaxed));
|
||||
|
||||
Ok((pos, control))
|
||||
}
|
||||
|
||||
pub(crate) fn tick(&self) -> Option<u64> {
|
||||
let mut resource_units = self.resource_units.lock().unwrap();
|
||||
let len = self.bytes_written.load(Ordering::Relaxed);
|
||||
resource_units.publish_size(len)
|
||||
}
|
||||
}
|
||||
|
||||
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
|
||||
async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
|
||||
&self,
|
||||
start: u64,
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
mut dst: tokio_epoll_uring::Slice<B>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let submitted_offset = self.buffered_writer.bytes_submitted();
|
||||
// We will fill the slice in back to front. Hence, we need
|
||||
// the slice to be fully initialized.
|
||||
// TODO(vlad): Is there a nicer way of doing this?
|
||||
dst.as_mut_rust_slice_full_zeroed();
|
||||
|
||||
let mutable = match self.buffered_writer.inspect_mutable() {
|
||||
Some(mutable) => &mutable[0..mutable.pending()],
|
||||
None => {
|
||||
// Timeline::cancel and hence buffered writer flush was cancelled.
|
||||
// Remain read-available while timeline is shutting down.
|
||||
&[]
|
||||
}
|
||||
};
|
||||
let writer = self.buffered_writer.read().await;
|
||||
|
||||
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
|
||||
// Read bytes written while under lock. This is a hack to deal with concurrent
|
||||
// writes updating the number of bytes written. `bytes_written` is not DIO alligned
|
||||
// but we may end the read there.
|
||||
//
|
||||
// TODO(vlad): Feels like there's a nicer path where we align the end if it
|
||||
// shoots over the end of the file.
|
||||
let bytes_written = self.bytes_written.load(Ordering::Acquire);
|
||||
|
||||
let dst_cap = dst.bytes_total().into_u64();
|
||||
let end = {
|
||||
// saturating_add is correct here because the max file size is u64::MAX, so,
|
||||
// if start + dst.len() > u64::MAX, then we know it will be a short read
|
||||
let mut end: u64 = start.saturating_add(dst_cap);
|
||||
if end > self.bytes_written {
|
||||
end = self.bytes_written;
|
||||
if end > bytes_written {
|
||||
end = bytes_written;
|
||||
}
|
||||
end
|
||||
};
|
||||
|
||||
let submitted_offset = writer.bytes_submitted();
|
||||
let maybe_flushed = writer.inspect_maybe_flushed();
|
||||
|
||||
let mutable = match writer.inspect_mutable() {
|
||||
Some(mutable) => &mutable[0..mutable.pending()],
|
||||
None => {
|
||||
// Timeline::cancel and hence buffered writer flush was cancelled.
|
||||
// Remain read-available while timeline is shutting down.
|
||||
&[]
|
||||
}
|
||||
};
|
||||
|
||||
// inclusive, exclusive
|
||||
#[derive(Debug)]
|
||||
struct Range<N>(N, N);
|
||||
@@ -306,13 +331,33 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
|
||||
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
// There are three sources from which we might have to read data:
|
||||
// 1. The file itself
|
||||
// 2. The buffer which contains changes currently being flushed
|
||||
// 3. The buffer which contains chnages yet to be flushed
|
||||
//
|
||||
// For better concurrency, we do them in reverse order: perform the in-memory
|
||||
// reads while holding the writer lock, drop the writer lock and read from the
|
||||
// file if required.
|
||||
|
||||
let dst = if mutable_range.len() > 0 {
|
||||
let offset_in_buffer = mutable_range
|
||||
.0
|
||||
.checked_sub(submitted_offset)
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
let to_copy =
|
||||
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
|
||||
let bounds = dst.bounds();
|
||||
let slice = self
|
||||
.file
|
||||
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
|
||||
.await?;
|
||||
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
|
||||
let mut view = dst.slice({
|
||||
let start =
|
||||
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
|
||||
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
|
||||
start..end
|
||||
});
|
||||
view.as_mut_rust_slice_full_zeroed()
|
||||
.copy_from_slice(to_copy);
|
||||
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
|
||||
} else {
|
||||
dst
|
||||
};
|
||||
@@ -342,24 +387,15 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst
|
||||
};
|
||||
|
||||
let dst = if mutable_range.len() > 0 {
|
||||
let offset_in_buffer = mutable_range
|
||||
.0
|
||||
.checked_sub(submitted_offset)
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
let to_copy =
|
||||
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
|
||||
drop(writer);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
let bounds = dst.bounds();
|
||||
let mut view = dst.slice({
|
||||
let start =
|
||||
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
|
||||
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
|
||||
start..end
|
||||
});
|
||||
view.as_mut_rust_slice_full_zeroed()
|
||||
.copy_from_slice(to_copy);
|
||||
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
|
||||
let slice = self
|
||||
.file
|
||||
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
|
||||
.await?;
|
||||
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
|
||||
} else {
|
||||
dst
|
||||
};
|
||||
@@ -460,13 +496,15 @@ mod tests {
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
|
||||
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.mutable();
|
||||
let writer = file.buffered_writer.read().await;
|
||||
let mutable = writer.mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
drop(writer);
|
||||
|
||||
let write_nbytes = cap * 2 + cap / 2;
|
||||
|
||||
@@ -504,10 +542,11 @@ mod tests {
|
||||
let file_contents = std::fs::read(file.file.path()).unwrap();
|
||||
assert!(file_contents == content[0..cap * 2]);
|
||||
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
let writer = file.buffered_writer.read().await;
|
||||
let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap();
|
||||
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
|
||||
|
||||
let mutable_buffer_contents = file.buffered_writer.mutable();
|
||||
let mutable_buffer_contents = writer.mutable();
|
||||
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
}
|
||||
|
||||
@@ -517,12 +556,14 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
|
||||
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
|
||||
let cap = file.buffered_writer.mutable().capacity();
|
||||
let writer = file.buffered_writer.read().await;
|
||||
let cap = writer.mutable().capacity();
|
||||
drop(writer);
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
@@ -540,12 +581,13 @@ mod tests {
|
||||
2 * cap.into_u64(),
|
||||
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
|
||||
);
|
||||
let writer = file.buffered_writer.read().await;
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&writer.inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&content[cap..cap * 2]
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.mutable()[0..cap / 2],
|
||||
&writer.mutable()[0..cap / 2],
|
||||
&content[cap * 2..cap * 2 + cap / 2]
|
||||
);
|
||||
}
|
||||
@@ -563,13 +605,15 @@ mod tests {
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
|
||||
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.mutable();
|
||||
let writer = file.buffered_writer.read().await;
|
||||
let mutable = writer.mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
drop(writer);
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
.take(cap * 2 + cap / 2)
|
||||
|
||||
@@ -551,8 +551,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
deserialized_metadata.body, expected_metadata.body,
|
||||
"Metadata of the old version {} should be upgraded to the latest version {}",
|
||||
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
|
||||
"Metadata of the old version {METADATA_OLD_FORMAT_VERSION} should be upgraded to the latest version {METADATA_FORMAT_VERSION}"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1427,7 +1427,7 @@ async fn init_timeline_state(
|
||||
let local_meta = dentry
|
||||
.metadata()
|
||||
.await
|
||||
.fatal_err(&format!("Read metadata on {}", file_path));
|
||||
.fatal_err(&format!("Read metadata on {file_path}"));
|
||||
|
||||
let file_name = file_path.file_name().expect("created it from the dentry");
|
||||
if crate::is_temporary(&file_path)
|
||||
|
||||
@@ -109,7 +109,7 @@ pub(crate) enum OnDiskValue {
|
||||
|
||||
/// Reconstruct data accumulated for a single key during a vectored get
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct VectoredValueReconstructState {
|
||||
pub struct VectoredValueReconstructState {
|
||||
pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
|
||||
|
||||
pub(crate) situation: ValueReconstructSituation,
|
||||
@@ -244,13 +244,60 @@ impl VectoredValueReconstructState {
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Benchmarking utility to await for the completion of all pending ios
|
||||
///
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// Technically fine to stop polling this future, but, the IOs will still
|
||||
/// be executed to completion by the sidecar task and hold on to / consume resources.
|
||||
/// Better not do it to make reasonsing about the system easier.
|
||||
#[cfg(feature = "benchmarking")]
|
||||
pub async fn sink_pending_ios(self) -> Result<(), std::io::Error> {
|
||||
let mut res = Ok(());
|
||||
|
||||
// We should try hard not to bail early, so that by the time we return from this
|
||||
// function, all IO for this value is done. It's not required -- we could totally
|
||||
// stop polling the IO futures in the sidecar task, they need to support that,
|
||||
// but just stopping to poll doesn't reduce the IO load on the disk. It's easier
|
||||
// to reason about the system if we just wait for all IO to complete, even if
|
||||
// we're no longer interested in the result.
|
||||
//
|
||||
// Revisit this when IO futures are replaced with a more sophisticated IO system
|
||||
// and an IO scheduler, where we know which IOs were submitted and which ones
|
||||
// just queued. Cf the comment on IoConcurrency::spawn_io.
|
||||
for (_lsn, waiter) in self.on_disk_values {
|
||||
let value_recv_res = waiter
|
||||
.wait_completion()
|
||||
// we rely on the caller to poll us to completion, so this is not a bail point
|
||||
.await;
|
||||
|
||||
match (&mut res, value_recv_res) {
|
||||
(Err(_), _) => {
|
||||
// We've already failed, no need to process more.
|
||||
}
|
||||
(Ok(_), Err(_wait_err)) => {
|
||||
// This shouldn't happen - likely the sidecar task panicked.
|
||||
unreachable!();
|
||||
}
|
||||
(Ok(_), Ok(Err(err))) => {
|
||||
let err: std::io::Error = err;
|
||||
res = Err(err);
|
||||
}
|
||||
(Ok(_ok), Ok(Ok(OnDiskValue::RawImage(_img)))) => {}
|
||||
(Ok(_ok), Ok(Ok(OnDiskValue::WalRecordOrImage(_buf)))) => {}
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
/// Bag of data accumulated during a vectored get..
|
||||
pub(crate) struct ValuesReconstructState {
|
||||
pub struct ValuesReconstructState {
|
||||
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
|
||||
/// should not expect to get anything from this hashmap.
|
||||
pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
|
||||
pub keys: HashMap<Key, VectoredValueReconstructState>,
|
||||
/// The keys which are already retrieved
|
||||
keys_done: KeySpaceRandomAccum,
|
||||
|
||||
@@ -272,7 +319,7 @@ pub(crate) struct ValuesReconstructState {
|
||||
/// The desired end state is that we always do parallel IO.
|
||||
/// This struct and the dispatching in the impl will be removed once
|
||||
/// we've built enough confidence.
|
||||
pub(crate) enum IoConcurrency {
|
||||
pub enum IoConcurrency {
|
||||
Sequential,
|
||||
SidecarTask {
|
||||
task_id: usize,
|
||||
@@ -317,10 +364,7 @@ impl IoConcurrency {
|
||||
Self::spawn(SelectedIoConcurrency::Sequential)
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_from_conf(
|
||||
conf: GetVectoredConcurrentIo,
|
||||
gate_guard: GateGuard,
|
||||
) -> IoConcurrency {
|
||||
pub fn spawn_from_conf(conf: GetVectoredConcurrentIo, gate_guard: GateGuard) -> IoConcurrency {
|
||||
let selected = match conf {
|
||||
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
|
||||
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
|
||||
@@ -425,16 +469,6 @@ impl IoConcurrency {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clone(&self) -> Self {
|
||||
match self {
|
||||
IoConcurrency::Sequential => IoConcurrency::Sequential,
|
||||
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
|
||||
task_id: *task_id,
|
||||
ios_tx: ios_tx.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
|
||||
///
|
||||
/// The IO is represented as an opaque future.
|
||||
@@ -573,6 +607,18 @@ impl IoConcurrency {
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for IoConcurrency {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
IoConcurrency::Sequential => IoConcurrency::Sequential,
|
||||
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
|
||||
task_id: *task_id,
|
||||
ios_tx: ios_tx.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Make noise in case the [`ValuesReconstructState`] gets dropped while
|
||||
/// there are still IOs in flight.
|
||||
/// Refer to `collect_pending_ios` for why we prefer not to do that.
|
||||
@@ -603,7 +649,7 @@ impl Drop for ValuesReconstructState {
|
||||
}
|
||||
|
||||
impl ValuesReconstructState {
|
||||
pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
|
||||
pub fn new(io_concurrency: IoConcurrency) -> Self {
|
||||
Self {
|
||||
keys: HashMap::new(),
|
||||
keys_done: KeySpaceRandomAccum::new(),
|
||||
|
||||
@@ -783,7 +783,7 @@ impl DeltaLayer {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
.with_context(|| format!("Failed to open file '{path}'"))?;
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
@@ -1401,7 +1401,7 @@ impl DeltaLayerInner {
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
let checkpoint = CheckPoint::decode(&img)?;
|
||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||
println!(" CHECKPOINT: {checkpoint:?}");
|
||||
}
|
||||
Value::WalRecord(_rec) => {
|
||||
println!(" unexpected walrecord value for checkpoint key");
|
||||
|
||||
@@ -272,8 +272,7 @@ impl ImageLayer {
|
||||
|
||||
conf.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
.join(format!(
|
||||
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
|
||||
filename_disambiguator
|
||||
"{fname}.{filename_disambiguator:x}.{TEMP_FILE_SUFFIX}"
|
||||
))
|
||||
}
|
||||
|
||||
@@ -370,7 +369,7 @@ impl ImageLayer {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
.with_context(|| format!("Failed to open file '{path}'"))?;
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader.read_blk(0, ctx).await?;
|
||||
@@ -1475,7 +1474,7 @@ mod test {
|
||||
assert_eq!(l1, expect_lsn);
|
||||
assert_eq!(&i1, i2);
|
||||
}
|
||||
(o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
|
||||
(o1, o2) => panic!("iterators length mismatch: {o1:?}, {o2:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,23 +70,15 @@ pub struct InMemoryLayer {
|
||||
/// We use a separate lock for the index to reduce the critical section
|
||||
/// during which reads cannot be planned.
|
||||
///
|
||||
/// If you need access to both the index and the underlying file at the same time,
|
||||
/// respect the following locking order to avoid deadlocks:
|
||||
/// 1. [`InMemoryLayer::inner`]
|
||||
/// 2. [`InMemoryLayer::index`]
|
||||
///
|
||||
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
|
||||
/// so it is not necessary to hold simultaneous locks on index.
|
||||
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
|
||||
/// Note that the file backing [`InMemoryLayer::file`] is append-only,
|
||||
/// so it is not necessary to hold a lock on the index while reading or writing from the file.
|
||||
/// In particular:
|
||||
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
|
||||
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
|
||||
/// 1. It is safe to read and release [`InMemoryLayer::index`] before reading from [`InMemoryLayer::file`].
|
||||
/// 2. It is safe to write to [`InMemoryLayer::file`] before locking and updating [`InMemoryLayer::index`].
|
||||
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
|
||||
|
||||
/// The above fields never change, except for `end_lsn`, which is only set once,
|
||||
/// and `index` (see rationale there).
|
||||
/// All other changing parts are in `inner`, and protected by a mutex.
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
/// Wrapper for the actual on-disk file. Uses interior mutability for concurrent reads/writes.
|
||||
file: EphemeralFile,
|
||||
|
||||
estimated_in_mem_size: AtomicU64,
|
||||
}
|
||||
@@ -96,20 +88,10 @@ impl std::fmt::Debug for InMemoryLayer {
|
||||
f.debug_struct("InMemoryLayer")
|
||||
.field("start_lsn", &self.start_lsn)
|
||||
.field("end_lsn", &self.end_lsn)
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InMemoryLayerInner {
|
||||
/// The values are stored in a serialized format in this file.
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
|
||||
resource_units: GlobalResourceUnits,
|
||||
}
|
||||
|
||||
/// Support the same max blob length as blob_io, because ultimately
|
||||
/// all the InMemoryLayer contents end up being written into a delta layer,
|
||||
/// using the [`crate::tenant::blob_io`].
|
||||
@@ -258,12 +240,6 @@ struct IndexEntryUnpacked {
|
||||
pos: u64,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InMemoryLayerInner").finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
|
||||
/// to minimize contention.
|
||||
///
|
||||
@@ -280,7 +256,7 @@ pub(crate) struct GlobalResources {
|
||||
}
|
||||
|
||||
// Per-timeline RAII struct for its contribution to [`GlobalResources`]
|
||||
struct GlobalResourceUnits {
|
||||
pub(crate) struct GlobalResourceUnits {
|
||||
// How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
|
||||
// for decrementing the global counter by this many bytes when dropped.
|
||||
dirty_bytes: u64,
|
||||
@@ -292,7 +268,7 @@ impl GlobalResourceUnits {
|
||||
// updated when the Timeline "ticks" in the background.
|
||||
const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
|
||||
|
||||
fn new() -> Self {
|
||||
pub(crate) fn new() -> Self {
|
||||
GLOBAL_RESOURCES
|
||||
.dirty_layers
|
||||
.fetch_add(1, AtomicOrdering::Relaxed);
|
||||
@@ -304,7 +280,7 @@ impl GlobalResourceUnits {
|
||||
///
|
||||
/// Returns the effective layer size limit that should be applied, if any, to keep
|
||||
/// the total number of dirty bytes below the configured maximum.
|
||||
fn publish_size(&mut self, size: u64) -> Option<u64> {
|
||||
pub(crate) fn publish_size(&mut self, size: u64) -> Option<u64> {
|
||||
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
|
||||
Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
|
||||
Ordering::Greater => {
|
||||
@@ -349,7 +325,7 @@ impl GlobalResourceUnits {
|
||||
|
||||
// Call publish_size if the input size differs from last published size by more than
|
||||
// the drift limit
|
||||
fn maybe_publish_size(&mut self, size: u64) {
|
||||
pub(crate) fn maybe_publish_size(&mut self, size: u64) {
|
||||
let publish = match size.cmp(&self.dirty_bytes) {
|
||||
Ordering::Equal => false,
|
||||
Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
|
||||
@@ -398,8 +374,8 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn try_len(&self) -> Option<u64> {
|
||||
self.inner.try_read().map(|i| i.file.len()).ok()
|
||||
pub(crate) fn len(&self) -> u64 {
|
||||
self.file.len()
|
||||
}
|
||||
|
||||
pub(crate) fn assert_writable(&self) {
|
||||
@@ -430,7 +406,7 @@ impl InMemoryLayer {
|
||||
|
||||
// Look up the keys in the provided keyspace and update
|
||||
// the reconstruct state with whatever is found.
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
pub async fn get_values_reconstruct_data(
|
||||
self: &Arc<InMemoryLayer>,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
@@ -479,14 +455,13 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
|
||||
drop(index); // release the lock before we spawn the IO
|
||||
let read_from = Arc::clone(self);
|
||||
let read_ctx = ctx.attached_child();
|
||||
reconstruct_state
|
||||
.spawn_io(async move {
|
||||
let inner = read_from.inner.read().await;
|
||||
let f = vectored_dio_read::execute(
|
||||
&inner.file,
|
||||
&read_from.file,
|
||||
reads
|
||||
.iter()
|
||||
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
|
||||
@@ -518,7 +493,6 @@ impl InMemoryLayer {
|
||||
// This is kinda forced for InMemoryLayer because we need to inner.read() anyway,
|
||||
// but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit
|
||||
// drop for consistency among all three layer types.
|
||||
drop(inner);
|
||||
drop(read_from);
|
||||
})
|
||||
.await;
|
||||
@@ -537,7 +511,7 @@ fn inmem_layer_log_display(
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
) -> std::fmt::Result {
|
||||
write!(f, "timeline {} in-memory ", timeline)?;
|
||||
write!(f, "timeline {timeline} in-memory ")?;
|
||||
inmem_layer_display(f, start_lsn, end_lsn)
|
||||
}
|
||||
|
||||
@@ -549,12 +523,6 @@ impl std::fmt::Display for InMemoryLayer {
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
/// Get layer size.
|
||||
pub async fn size(&self) -> Result<u64> {
|
||||
let inner = self.inner.read().await;
|
||||
Ok(inner.file.len())
|
||||
}
|
||||
|
||||
pub fn estimated_in_mem_size(&self) -> u64 {
|
||||
self.estimated_in_mem_size.load(AtomicOrdering::Relaxed)
|
||||
}
|
||||
@@ -587,10 +555,7 @@ impl InMemoryLayer {
|
||||
end_lsn: OnceLock::new(),
|
||||
opened_at: Instant::now(),
|
||||
index: RwLock::new(BTreeMap::new()),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
file,
|
||||
resource_units: GlobalResourceUnits::new(),
|
||||
}),
|
||||
file,
|
||||
estimated_in_mem_size: AtomicU64::new(0),
|
||||
})
|
||||
}
|
||||
@@ -599,41 +564,37 @@ impl InMemoryLayer {
|
||||
///
|
||||
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
|
||||
/// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
|
||||
///
|
||||
/// This method shall not be called concurrently. We enforce this property via [`crate::tenant::Timeline::write_lock`].
|
||||
///
|
||||
/// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
|
||||
pub async fn put_batch(
|
||||
&self,
|
||||
serialized_batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let (base_offset, metadata) = {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
self.assert_writable();
|
||||
|
||||
let base_offset = inner.file.len();
|
||||
let base_offset = self.file.len();
|
||||
|
||||
let SerializedValueBatch {
|
||||
raw,
|
||||
metadata,
|
||||
max_lsn: _,
|
||||
len: _,
|
||||
} = serialized_batch;
|
||||
let SerializedValueBatch {
|
||||
raw,
|
||||
metadata,
|
||||
max_lsn: _,
|
||||
len: _,
|
||||
} = serialized_batch;
|
||||
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = inner.file.len();
|
||||
// Write the batch to the file
|
||||
self.file.write_raw(&raw, ctx).await?;
|
||||
let new_size = self.file.len();
|
||||
|
||||
let expected_new_len = base_offset
|
||||
.checked_add(raw.len().into_u64())
|
||||
// write_raw would error if we were to overflow u64.
|
||||
// also IndexEntry and higher levels in
|
||||
//the code don't allow the file to grow that large
|
||||
.unwrap();
|
||||
assert_eq!(new_size, expected_new_len);
|
||||
|
||||
inner.resource_units.maybe_publish_size(new_size);
|
||||
|
||||
(base_offset, metadata)
|
||||
};
|
||||
let expected_new_len = base_offset
|
||||
.checked_add(raw.len().into_u64())
|
||||
// write_raw would error if we were to overflow u64.
|
||||
// also IndexEntry and higher levels in
|
||||
//the code don't allow the file to grow that large
|
||||
.unwrap();
|
||||
assert_eq!(new_size, expected_new_len);
|
||||
|
||||
// Update the index with the new entries
|
||||
let mut index = self.index.write().await;
|
||||
@@ -686,10 +647,8 @@ impl InMemoryLayer {
|
||||
self.opened_at
|
||||
}
|
||||
|
||||
pub(crate) async fn tick(&self) -> Option<u64> {
|
||||
let mut inner = self.inner.write().await;
|
||||
let size = inner.file.len();
|
||||
inner.resource_units.publish_size(size)
|
||||
pub(crate) fn tick(&self) -> Option<u64> {
|
||||
self.file.tick()
|
||||
}
|
||||
|
||||
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
|
||||
@@ -753,12 +712,6 @@ impl InMemoryLayer {
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. See the comment on
|
||||
// [`InMemoryLayer::freeze`] to understand how locking between the append path
|
||||
// and layer flushing works.
|
||||
let inner = self.inner.read().await;
|
||||
let index = self.index.read().await;
|
||||
|
||||
use l0_flush::Inner;
|
||||
@@ -793,7 +746,7 @@ impl InMemoryLayer {
|
||||
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::Direct { .. } => {
|
||||
let file_contents = inner.file.load_to_io_buf(ctx).await?;
|
||||
let file_contents = self.file.load_to_io_buf(ctx).await?;
|
||||
let file_contents = file_contents.freeze();
|
||||
|
||||
for (key, vec_map) in index.iter() {
|
||||
|
||||
@@ -380,7 +380,7 @@ impl<B: Buffer> std::fmt::Debug for LogicalReadState<B> {
|
||||
write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer))
|
||||
}
|
||||
LogicalReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)),
|
||||
LogicalReadState::Error(e) => write!(f, "Error({:?})", e),
|
||||
LogicalReadState::Error(e) => write!(f, "Error({e:?})"),
|
||||
LogicalReadState::Undefined => write!(f, "Undefined"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -105,7 +105,7 @@ impl std::fmt::Display for Layer {
|
||||
|
||||
impl std::fmt::Debug for Layer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self)
|
||||
write!(f, "{self}")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -178,7 +178,7 @@ pub enum LastImageLayerCreationStatus {
|
||||
|
||||
impl std::fmt::Display for ImageLayerCreationMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
write!(f, "{self:?}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -632,7 +632,7 @@ pub enum ReadPathLayerId {
|
||||
impl std::fmt::Display for ReadPathLayerId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ReadPathLayerId::PersistentLayer(key) => write!(f, "{}", key),
|
||||
ReadPathLayerId::PersistentLayer(key) => write!(f, "{key}"),
|
||||
ReadPathLayerId::InMemoryLayer(range) => {
|
||||
write!(f, "in-mem {}..{}", range.start, range.end)
|
||||
}
|
||||
@@ -708,7 +708,7 @@ impl MissingKeyError {
|
||||
|
||||
impl std::fmt::Debug for MissingKeyError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self)
|
||||
write!(f, "{self}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -721,19 +721,19 @@ impl std::fmt::Display for MissingKeyError {
|
||||
)?;
|
||||
|
||||
if let Some(ref ancestor_lsn) = self.ancestor_lsn {
|
||||
write!(f, ", ancestor {}", ancestor_lsn)?;
|
||||
write!(f, ", ancestor {ancestor_lsn}")?;
|
||||
}
|
||||
|
||||
if let Some(ref query) = self.query {
|
||||
write!(f, ", query {}", query)?;
|
||||
write!(f, ", query {query}")?;
|
||||
}
|
||||
|
||||
if let Some(ref read_path) = self.read_path {
|
||||
write!(f, "\n{}", read_path)?;
|
||||
write!(f, "\n{read_path}")?;
|
||||
}
|
||||
|
||||
if let Some(ref backtrace) = self.backtrace {
|
||||
write!(f, "\n{}", backtrace)?;
|
||||
write!(f, "\n{backtrace}")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -816,7 +816,7 @@ impl From<layer_manager::Shutdown> for FlushLayerError {
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum GetVectoredError {
|
||||
pub enum GetVectoredError {
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
@@ -849,7 +849,7 @@ impl From<GetReadyAncestorError> for GetVectoredError {
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum GetReadyAncestorError {
|
||||
pub enum GetReadyAncestorError {
|
||||
#[error("ancestor LSN wait error")]
|
||||
AncestorLsnTimeout(#[from] WaitLsnError),
|
||||
|
||||
@@ -939,7 +939,7 @@ impl std::fmt::Debug for Timeline {
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub(crate) enum WaitLsnError {
|
||||
pub enum WaitLsnError {
|
||||
// Called on a timeline which is shutting down
|
||||
#[error("Shutdown")]
|
||||
Shutdown,
|
||||
@@ -1902,16 +1902,11 @@ impl Timeline {
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(current_size) = open_layer.try_len() else {
|
||||
// Unexpected: since we hold the write guard, nobody else should be writing to this layer, so
|
||||
// read lock to get size should always succeed.
|
||||
tracing::warn!("Lock conflict while reading size of open layer");
|
||||
return;
|
||||
};
|
||||
let current_size = open_layer.len();
|
||||
|
||||
let current_lsn = self.get_last_record_lsn();
|
||||
|
||||
let checkpoint_distance_override = open_layer.tick().await;
|
||||
let checkpoint_distance_override = open_layer.tick();
|
||||
|
||||
if let Some(size_override) = checkpoint_distance_override {
|
||||
if current_size > size_override {
|
||||
@@ -7184,9 +7179,7 @@ impl Timeline {
|
||||
if let Some(end) = layer_end_lsn {
|
||||
assert!(
|
||||
end <= last_record_lsn,
|
||||
"advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}",
|
||||
end,
|
||||
last_record_lsn,
|
||||
"advance last record lsn before inserting a layer, end_lsn={end}, last_record_lsn={last_record_lsn}",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -7372,7 +7365,7 @@ impl TimelineWriter<'_> {
|
||||
.tl
|
||||
.get_layer_for_write(at, &self.write_guard, ctx)
|
||||
.await?;
|
||||
let initial_size = layer.size().await?;
|
||||
let initial_size = layer.len();
|
||||
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
self.write_guard.replace(TimelineWriterState::new(
|
||||
|
||||
@@ -977,7 +977,7 @@ impl KeyHistoryRetention {
|
||||
tline
|
||||
.reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
|
||||
.await
|
||||
.with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
|
||||
.with_context(|| format!("verification failed for key {key} at lsn {lsn}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2647,15 +2647,15 @@ impl Timeline {
|
||||
use std::fmt::Write;
|
||||
let mut output = String::new();
|
||||
if let Some((key, _, _)) = replay_history.first() {
|
||||
write!(output, "key={} ", key).unwrap();
|
||||
write!(output, "key={key} ").unwrap();
|
||||
let mut cnt = 0;
|
||||
for (_, lsn, val) in replay_history {
|
||||
if val.is_image() {
|
||||
write!(output, "i@{} ", lsn).unwrap();
|
||||
write!(output, "i@{lsn} ").unwrap();
|
||||
} else if val.will_init() {
|
||||
write!(output, "di@{} ", lsn).unwrap();
|
||||
write!(output, "di@{lsn} ").unwrap();
|
||||
} else {
|
||||
write!(output, "d@{} ", lsn).unwrap();
|
||||
write!(output, "d@{lsn} ").unwrap();
|
||||
}
|
||||
cnt += 1;
|
||||
if cnt >= 128 {
|
||||
|
||||
@@ -275,12 +275,20 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let copy_stream = replication_client.copy_both_simple(&query).await?;
|
||||
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.map_err(|e| match e.kind {
|
||||
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
|
||||
let walingest_res = select! {
|
||||
walingest_res = walingest_future => walingest_res,
|
||||
_ = cancellation.cancelled() => {
|
||||
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
|
||||
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
|
||||
debug!("Connection cancelled");
|
||||
return Err(WalReceiverError::Cancelled);
|
||||
},
|
||||
};
|
||||
let mut walingest = walingest_res.map_err(|e| match e.kind {
|
||||
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
|
||||
let (format, compression) = match protocol {
|
||||
PostgresClientProtocol::Interpreted {
|
||||
@@ -360,8 +368,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
match raw_wal_start_lsn.cmp(&expected_wal_start) {
|
||||
std::cmp::Ordering::Greater => {
|
||||
let msg = format!(
|
||||
"Gap in streamed WAL: [{}, {})",
|
||||
expected_wal_start, raw_wal_start_lsn
|
||||
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn})"
|
||||
);
|
||||
critical!("{msg}");
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user