Compare commits

...

7 Commits

Author SHA1 Message Date
Mikhail Kot
8004abbb9c fix 2025-06-24 12:50:59 +01:00
Mikhail Kot
d817dbab37 style 2025-06-24 12:41:30 +01:00
Mikhail Kot
a85cd674d2 NO_CLOSE_RANGE macro for old glibc/kernel builds 2025-06-24 12:41:30 +01:00
Arpad Müller
552249607d apply clippy fixes for 1.88.0 beta (#12331)
The 1.88.0 stable release is near (this Thursday). We'd like to fix most
warnings beforehand so that the compiler upgrade doesn't require
approval from too many teams.

This is therefore a preparation PR (like similar PRs before it).

There is a lot of changes for this release, mostly because the
`uninlined_format_args` lint has been added to the `style` lint group.
One can read more about the lint
[here](https://rust-lang.github.io/rust-clippy/master/#/uninlined_format_args).

The PR is the result of `cargo +beta clippy --fix` and `cargo fmt`. One
remaining warning is left for the proxy team.

---------

Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2025-06-24 10:12:42 +00:00
Ivan Efremov
a29772bf6e Create proxy-bench periodic run in CI (#12242)
Currently run for test only via pushing to the test-proxy-bench branch.

Relates to the #22681
2025-06-24 09:54:43 +00:00
Arpad Müller
0efff1db26 Allow cancellation errors in tests that allow timeline deletion errors (#12315)
After merging of PR https://github.com/neondatabase/neon/pull/11712 we
saw some tests be flaky, with errors showing up about the timeline
having been cancelled instead of having been deleted. This is an outcome
that is inherently racy with the "has been deleted" error.

In some instances, https://github.com/neondatabase/neon/pull/11712 has
already added the error about the timeline having been cancelled. This
PR adds them to the remaining instances of
https://github.com/neondatabase/neon/pull/11712, fixing the flakiness.
2025-06-23 22:26:38 +00:00
Aleksandr Sarantsev
5eecde461d storcon: Fix migration for Attached(0) tenants (#12256)
## Problem

`Attached(0)` tenant migrations can get stuck if the heatmap file has
not been uploaded.

## Summary of Changes

- Added a test to reproduce the issue.
- Introduced a `kick_secondary_downloads` config flag:
  - Enabled in testing environments.
  - Disabled in production (and in the new test).
- Updated `Attached(0)` locations to consider the number of secondaries
in their intent when deciding whether to download the heatmap.
2025-06-23 18:55:26 +00:00
122 changed files with 820 additions and 562 deletions

83
.github/workflows/proxy-benchmark.yml vendored Normal file
View File

@@ -0,0 +1,83 @@
name: Periodic proxy performance test on unit-perf hetzner runner
on:
push: # TODO: remove after testing
branches:
- test-proxy-bench # Runs on pushes to branches starting with test-proxy-bench
# schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
# - cron: '0 5 * * *' # Runs at 5 UTC once a day
workflow_dispatch: # adds an ability to run this manually
defaults:
run:
shell: bash -euo pipefail {0}
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
permissions:
contents: read
jobs:
run_periodic_proxybench_test:
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
runs-on: [self-hosted, unit-perf]
timeout-minutes: 60 # 1h timeout
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Checkout proxy-bench Repo
uses: actions/checkout@v4
with:
repository: neondatabase/proxy-bench
path: proxy-bench
- name: Set up the environment which depends on $RUNNER_TEMP on nvme drive
id: set-env
shell: bash -euxo pipefail {0}
run: |
PROXY_BENCH_PATH=$(realpath ./proxy-bench)
{
echo "PROXY_BENCH_PATH=$PROXY_BENCH_PATH"
echo "NEON_DIR=${RUNNER_TEMP}/neon"
echo "TEST_OUTPUT=${PROXY_BENCH_PATH}/test_output"
echo ""
} >> "$GITHUB_ENV"
- name: Run proxy-bench
run: ./${PROXY_BENCH_PATH}/run.sh
- name: Ingest Bench Results # neon repo script
if: success()
run: |
mkdir -p $TEST_OUTPUT
python $NEON_DIR/scripts/proxy_bench_results_ingest.py --out $TEST_OUTPUT
- name: Push Metrics to Proxy perf database
if: success()
env:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PROXY_TEST_RESULT_CONNSTR }}"
REPORT_FROM: $TEST_OUTPUT
run: $NEON_DIR/scripts/generate_and_push_perf_report.sh
- name: Docker cleanup
run: docker compose down
- name: Notify Failure
if: failure()
run: echo "Proxy bench job failed" && exit 1

View File

@@ -486,10 +486,8 @@ async fn cmd_pgdata(
};
let superuser = "cloud_admin";
let destination_connstring = format!(
"host=localhost port={} user={} dbname=neondb",
pg_port, superuser
);
let destination_connstring =
format!("host=localhost port={pg_port} user={superuser} dbname=neondb");
let pgdata_dir = workdir.join("pgdata");
let mut proc = PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());

View File

@@ -69,7 +69,7 @@ impl clap::builder::TypedValueParser for S3Uri {
S3Uri::from_str(value_str).map_err(|e| {
clap::Error::raw(
clap::error::ErrorKind::InvalidValue,
format!("Failed to parse S3 URI: {}", e),
format!("Failed to parse S3 URI: {e}"),
)
})
}

View File

@@ -22,7 +22,7 @@ pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<Cat
spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -119,7 +119,7 @@ pub async fn get_database_schema(
_ => {
let mut lines = stderr_reader.lines();
if let Some(line) = lines.next_line().await? {
if line.contains(&format!("FATAL: database \"{}\" does not exist", dbname)) {
if line.contains(&format!("FATAL: database \"{dbname}\" does not exist")) {
return Err(SchemaDumpError::DatabaseDoesNotExist);
}
warn!("pg_dump stderr: {}", line)

View File

@@ -250,8 +250,7 @@ impl ParsedSpec {
// duplicate entry?
if current == previous {
return Err(format!(
"duplicate entry in safekeeper_connstrings: {}!",
current,
"duplicate entry in safekeeper_connstrings: {current}!",
));
}
@@ -410,7 +409,7 @@ impl ComputeNode {
let options = match conn_conf.get_options() {
// Allow the control plane to override any options set by the
// compute
Some(options) => format!("{} {}", EXTRA_OPTIONS, options),
Some(options) => format!("{EXTRA_OPTIONS} {options}"),
None => EXTRA_OPTIONS.to_string(),
};
conn_conf.options(&options);
@@ -1127,7 +1126,7 @@ impl ComputeNode {
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
// Format connstr
let id = connstr.clone();
let connstr = format!("postgresql://no_user@{}", connstr);
let connstr = format!("postgresql://no_user@{connstr}");
let options = format!(
"-c timeline_id={} tenant_id={}",
pspec.timeline_id, pspec.tenant_id
@@ -1490,7 +1489,7 @@ impl ComputeNode {
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -1633,7 +1632,7 @@ impl ComputeNode {
Ok((mut client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(&mut client).await {
@@ -1937,7 +1936,7 @@ impl ComputeNode {
let (client, connection) = connect_result.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
let result = client
@@ -2106,7 +2105,7 @@ LIMIT 100",
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
}
Ok(())
@@ -2133,7 +2132,7 @@ LIMIT 100",
let version: Option<ExtVersion> = db_client
.query_opt(version_query, &[&ext_name])
.await
.with_context(|| format!("Failed to execute query: {}", version_query))?
.with_context(|| format!("Failed to execute query: {version_query}"))?
.map(|row| row.get(0));
// sanitize the inputs as postgres idents.
@@ -2148,14 +2147,14 @@ LIMIT 100",
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
} else {
let query =
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
db_client
.simple_query(&query)
.await
.with_context(|| format!("Failed to execute query: {}", query))?;
.with_context(|| format!("Failed to execute query: {query}"))?;
}
Ok(ext_version)

View File

@@ -51,7 +51,7 @@ pub fn write_postgres_conf(
// Write the postgresql.conf content from the spec file as is.
if let Some(conf) = &spec.cluster.postgresql_conf {
writeln!(file, "{}", conf)?;
writeln!(file, "{conf}")?;
}
// Add options for connecting to storage
@@ -70,7 +70,7 @@ pub fn write_postgres_conf(
);
// If generation is given, prepend sk list with g#number:
if let Some(generation) = spec.safekeepers_generation {
write!(neon_safekeepers_value, "g#{}:", generation)?;
write!(neon_safekeepers_value, "g#{generation}:")?;
}
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
writeln!(
@@ -109,8 +109,8 @@ pub fn write_postgres_conf(
tls::update_key_path_blocking(pgdata_path, tls_config);
// these are the default, but good to be explicit.
writeln!(file, "ssl_cert_file = '{}'", SERVER_CRT)?;
writeln!(file, "ssl_key_file = '{}'", SERVER_KEY)?;
writeln!(file, "ssl_cert_file = '{SERVER_CRT}'")?;
writeln!(file, "ssl_key_file = '{SERVER_KEY}'")?;
}
// Locales
@@ -191,8 +191,7 @@ pub fn write_postgres_conf(
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
)?;
} else {
// Typically, this should be unreacheable,
@@ -244,8 +243,7 @@ pub fn write_postgres_conf(
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
"shared_preload_libraries='{libs}{extra_shared_preload_libraries}'"
)?;
} else {
// Typically, this should be unreacheable,
@@ -263,7 +261,7 @@ pub fn write_postgres_conf(
}
}
writeln!(file, "neon.extension_server_port={}", extension_server_port)?;
writeln!(file, "neon.extension_server_port={extension_server_port}")?;
if spec.drop_subscriptions_before_start {
writeln!(file, "neon.disable_logical_replication_subscribers=true")?;
@@ -291,7 +289,7 @@ where
{
let path = pgdata_path.join("compute_ctl_temp_override.conf");
let mut file = File::create(path)?;
write!(file, "{}", options)?;
write!(file, "{options}")?;
let res = exec();

View File

@@ -310,10 +310,7 @@ async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Re
async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!(
"could not perform remote extensions server request: {:?}",
e
),
format!("could not perform remote extensions server request: {e:?}"),
UNKNOWN_HTTP_STATUS.to_string(),
)
})?;
@@ -323,7 +320,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
StatusCode::OK => match resp.bytes().await {
Ok(resp) => Ok(resp),
Err(e) => Err((
format!("could not read remote extensions server response: {:?}", e),
format!("could not read remote extensions server response: {e:?}"),
// It's fine to return and report error with status as 200 OK,
// because we still failed to read the response.
status.to_string(),
@@ -334,10 +331,7 @@ async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)
status.to_string(),
)),
_ => Err((
format!(
"unexpected remote extensions server response status code: {}",
status
),
format!("unexpected remote extensions server response status code: {status}"),
status.to_string(),
)),
}

View File

@@ -65,7 +65,7 @@ pub(in crate::http) async fn configure(
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
let msg = format!("compute configuration failed: {err:?}");
return Err(msg);
}
}

View File

@@ -43,7 +43,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
let (mut client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -57,7 +57,7 @@ pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExten
let (client, connection) = conf.connect(NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -130,7 +130,7 @@ fn try_acquire_lsn_lease(
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
let res = client.simple_query(&cmd)?;
let msg = match res.first() {
Some(msg) => msg,

View File

@@ -36,9 +36,9 @@ pub fn escape_literal(s: &str) -> String {
let res = s.replace('\'', "''").replace('\\', "\\\\");
if res.contains('\\') {
format!("E'{}'", res)
format!("E'{res}'")
} else {
format!("'{}'", res)
format!("'{res}'")
}
}
@@ -46,7 +46,7 @@ pub fn escape_literal(s: &str) -> String {
/// with `'{}'` is not required, as it returns a ready-to-use config string.
pub fn escape_conf_value(s: &str) -> String {
let res = s.replace('\'', "''").replace('\\', "\\\\");
format!("'{}'", res)
format!("'{res}'")
}
pub trait GenericOptionExt {
@@ -446,7 +446,7 @@ pub async fn tune_pgbouncer(
let mut pgbouncer_connstr =
"host=localhost port=6432 dbname=pgbouncer user=postgres sslmode=disable".to_string();
if let Ok(pass) = std::env::var("PGBOUNCER_PASSWORD") {
pgbouncer_connstr.push_str(format!(" password={}", pass).as_str());
pgbouncer_connstr.push_str(format!(" password={pass}").as_str());
}
pgbouncer_connstr
};
@@ -464,7 +464,7 @@ pub async fn tune_pgbouncer(
Ok((client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
break client;

View File

@@ -23,12 +23,12 @@ fn do_control_plane_request(
) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
.header("Authorization", format!("Bearer {jwt}"))
.send()
.map_err(|e| {
(
true,
format!("could not perform request to control plane: {:?}", e),
format!("could not perform request to control plane: {e:?}"),
UNKNOWN_HTTP_STATUS.to_string(),
)
})?;
@@ -39,7 +39,7 @@ fn do_control_plane_request(
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {:?}", e),
format!("could not deserialize control plane response: {e:?}"),
status.to_string(),
)),
},
@@ -62,7 +62,7 @@ fn do_control_plane_request(
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!("unexpected control plane response status code: {}", status),
format!("unexpected control plane response status code: {status}"),
status.to_string(),
)),
}

View File

@@ -933,56 +933,53 @@ async fn get_operations<'a>(
PerDatabasePhase::DeleteDBRoleReferences => {
let ctx = ctx.read().await;
let operations =
spec.delta_operations
.iter()
.flatten()
.filter(|op| op.action == "delete_role")
.filter_map(move |op| {
if db.is_owned_by(&op.name) {
return None;
}
if !ctx.roles.contains_key(&op.name) {
return None;
}
let quoted = op.name.pg_quote();
let new_owner = match &db {
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
DB::UserDB(db) => db.owner.pg_quote(),
};
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
let operations = spec
.delta_operations
.iter()
.flatten()
.filter(|op| op.action == "delete_role")
.filter_map(move |op| {
if db.is_owned_by(&op.name) {
return None;
}
if !ctx.roles.contains_key(&op.name) {
return None;
}
let quoted = op.name.pg_quote();
let new_owner = match &db {
DB::SystemDB => PgIdent::from("cloud_admin").pg_quote(),
DB::UserDB(db) => db.owner.pg_quote(),
};
let (escaped_role, outer_tag) = op.name.pg_quote_dollar();
Some(vec![
// This will reassign all dependent objects to the db owner
Operation {
query: format!(
"REASSIGN OWNED BY {} TO {}",
quoted, new_owner,
),
comment: None,
},
// Revoke some potentially blocking privileges (Neon-specific currently)
Operation {
query: format!(
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
),
comment: None,
},
// This now will only drop privileges of the role
// TODO: this is obviously not 100% true because of the above case,
// there could be still some privileges that are not revoked. Maybe this
// only drops privileges that were granted *by this* role, not *to this* role,
// but this has to be checked.
Operation {
query: format!("DROP OWNED BY {}", quoted),
comment: None,
},
])
})
.flatten();
Some(vec![
// This will reassign all dependent objects to the db owner
Operation {
query: format!("REASSIGN OWNED BY {quoted} TO {new_owner}",),
comment: None,
},
// Revoke some potentially blocking privileges (Neon-specific currently)
Operation {
query: format!(
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
// N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
role_name = escaped_role,
outer_tag = outer_tag,
),
comment: None,
},
// This now will only drop privileges of the role
// TODO: this is obviously not 100% true because of the above case,
// there could be still some privileges that are not revoked. Maybe this
// only drops privileges that were granted *by this* role, not *to this* role,
// but this has to be checked.
Operation {
query: format!("DROP OWNED BY {quoted}"),
comment: None,
},
])
})
.flatten();
Ok(Box::new(operations))
}

View File

@@ -27,7 +27,7 @@ pub async fn ping_safekeeper(
let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -919,7 +919,7 @@ fn print_timeline(
br_sym = "┗━";
}
print!("{} @{}: ", br_sym, ancestor_lsn);
print!("{br_sym} @{ancestor_lsn}: ");
}
// Finally print a timeline id and name with new line
@@ -1742,7 +1742,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
StopMode::Immediate => true,
};
if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
eprintln!("pageserver stop failed: {}", e);
eprintln!("pageserver stop failed: {e}");
exit(1);
}
}
@@ -1751,7 +1751,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
let pageserver = get_pageserver(env, args.pageserver_id)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
eprintln!("pageserver stop failed: {e}");
exit(1);
}
@@ -1768,7 +1768,7 @@ async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) ->
{
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);
eprintln!("Page server is not available: {err}");
exit(1);
}
}
@@ -1805,7 +1805,7 @@ async fn handle_storage_controller(
},
};
if let Err(e) = svc.stop(stop_args).await {
eprintln!("stop failed: {}", e);
eprintln!("stop failed: {e}");
exit(1);
}
}
@@ -1827,7 +1827,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
let safekeeper = get_safekeeper(env, args.id)?;
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
eprintln!("safekeeper start failed: {}", e);
eprintln!("safekeeper start failed: {e}");
exit(1);
}
}
@@ -1839,7 +1839,7 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
StopMode::Immediate => true,
};
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
eprintln!("safekeeper stop failed: {e}");
exit(1);
}
}
@@ -1852,12 +1852,12 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
};
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
eprintln!("safekeeper stop failed: {e}");
exit(1);
}
if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
eprintln!("safekeeper start failed: {}", e);
eprintln!("safekeeper start failed: {e}");
exit(1);
}
}
@@ -2113,7 +2113,7 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let storage = EndpointStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("endpoint_storage stop failed: {:#}", e);
eprintln!("endpoint_storage stop failed: {e:#}");
}
for ps_conf in &env.pageservers {

View File

@@ -846,10 +846,10 @@ impl Endpoint {
// Launch compute_ctl
let conn_str = self.connstr("cloud_admin", "postgres");
println!("Starting postgres node at '{}'", conn_str);
println!("Starting postgres node at '{conn_str}'");
if create_test_user {
let conn_str = self.connstr("test", "neondb");
println!("Also at '{}'", conn_str);
println!("Also at '{conn_str}'");
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
cmd.args([
@@ -948,8 +948,7 @@ impl Endpoint {
Err(e) => {
if Instant::now().duration_since(start_at) > start_timeout {
return Err(e).context(format!(
"timed out {:?} waiting to connect to compute_ctl HTTP",
start_timeout,
"timed out {start_timeout:?} waiting to connect to compute_ctl HTTP",
));
}
}
@@ -988,7 +987,7 @@ impl Endpoint {
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = response.url().to_owned();
let msg = match response.text().await {
Ok(err_body) => format!("Error: {}", err_body),
Ok(err_body) => format!("Error: {err_body}"),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
@@ -1054,7 +1053,7 @@ impl Endpoint {
} else {
let url = response.url().to_owned();
let msg = match response.text().await {
Ok(err_body) => format!("Error: {}", err_body),
Ok(err_body) => format!("Error: {err_body}"),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))

View File

@@ -211,6 +211,8 @@ pub struct NeonStorageControllerConf {
pub use_local_compute_notifications: bool,
pub timeline_safekeeper_count: Option<i64>,
pub kick_secondary_downloads: Option<bool>,
}
impl NeonStorageControllerConf {
@@ -242,6 +244,7 @@ impl Default for NeonStorageControllerConf {
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
timeline_safekeeper_count: None,
kick_secondary_downloads: None,
}
}
}
@@ -257,7 +260,7 @@ impl Default for EndpointStorageConf {
impl NeonBroker {
pub fn client_url(&self) -> Url {
let url = if let Some(addr) = self.listen_https_addr {
format!("https://{}", addr)
format!("https://{addr}")
} else {
format!(
"http://{}",
@@ -730,7 +733,7 @@ impl LocalEnv {
let config_toml_path = dentry.path().join("pageserver.toml");
let config_toml: PageserverConfigTomlSubset = toml_edit::de::from_str(
&std::fs::read_to_string(&config_toml_path)
.with_context(|| format!("read {:?}", config_toml_path))?,
.with_context(|| format!("read {config_toml_path:?}"))?,
)
.context("parse pageserver.toml")?;
let identity_toml_path = dentry.path().join("identity.toml");
@@ -740,7 +743,7 @@ impl LocalEnv {
}
let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
&std::fs::read_to_string(&identity_toml_path)
.with_context(|| format!("read {:?}", identity_toml_path))?,
.with_context(|| format!("read {identity_toml_path:?}"))?,
)
.context("parse identity.toml")?;
let PageserverConfigTomlSubset {

View File

@@ -121,7 +121,7 @@ impl PageServerNode {
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
overrides.push(format!("control_plane_api_token='{jwt_token}'"));
}
if !conf.other.contains_key("remote_storage") {

View File

@@ -143,7 +143,7 @@ impl SafekeeperNode {
let id_string = id.to_string();
// TODO: add availability_zone to the config.
// Right now we just specify any value here and use it to check metrics in tests.
let availability_zone = format!("sk-{}", id_string);
let availability_zone = format!("sk-{id_string}");
let mut args = vec![
"-D".to_owned(),

View File

@@ -167,7 +167,7 @@ impl StorageController {
fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
self.env
.base_data_dir
.join(format!("storage_controller_{}", instance_id))
.join(format!("storage_controller_{instance_id}"))
}
fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
@@ -220,7 +220,7 @@ impl StorageController {
"-d",
DB_NAME,
"-p",
&format!("{}", postgres_port),
&format!("{postgres_port}"),
];
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
@@ -263,7 +263,7 @@ impl StorageController {
"-h",
"localhost",
"-p",
&format!("{}", postgres_port),
&format!("{postgres_port}"),
"-U",
&username(),
"-O",
@@ -425,7 +425,7 @@ impl StorageController {
// from `LocalEnv`'s config file (`.neon/config`).
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}\nfsync=off\n", postgres_port),
format!("port = {postgres_port}\nfsync=off\n"),
)
.await?;
@@ -477,7 +477,7 @@ impl StorageController {
self.setup_database(postgres_port).await?;
}
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
let database_url = format!("postgresql://localhost:{postgres_port}/{DB_NAME}");
// We support running a startup SQL script to fiddle with the database before we launch storcon.
// This is used by the test suite.
@@ -508,7 +508,7 @@ impl StorageController {
drop(client);
conn.await??;
let addr = format!("{}:{}", host, listen_port);
let addr = format!("{host}:{listen_port}");
let address_for_peers = Uri::builder()
.scheme(scheme)
.authority(addr.clone())
@@ -557,6 +557,10 @@ impl StorageController {
args.push("--use-local-compute-notifications".to_string());
}
if let Some(value) = self.config.kick_secondary_downloads {
args.push(format!("--kick-secondary-downloads={value}"));
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
}
@@ -806,9 +810,9 @@ impl StorageController {
builder = builder.json(&body)
}
if let Some(private_key) = &self.private_key {
println!("Getting claims for path {}", path);
println!("Getting claims for path {path}");
if let Some(required_claims) = Self::get_claims_for_path(&path)? {
println!("Got claims {:?} for path {}", required_claims, path);
println!("Got claims {required_claims:?} for path {path}");
let jwt_token = encode_from_key_file(&required_claims, private_key)?;
builder = builder.header(
reqwest::header::AUTHORIZATION,

View File

@@ -649,7 +649,7 @@ async fn main() -> anyhow::Result<()> {
response
.new_shards
.iter()
.map(|s| format!("{:?}", s))
.map(|s| format!("{s:?}"))
.collect::<Vec<_>>()
.join(",")
);
@@ -771,8 +771,8 @@ async fn main() -> anyhow::Result<()> {
println!("Tenant {tenant_id}");
let mut table = comfy_table::Table::new();
table.add_row(["Policy", &format!("{:?}", policy)]);
table.add_row(["Stripe size", &format!("{:?}", stripe_size)]);
table.add_row(["Policy", &format!("{policy:?}")]);
table.add_row(["Stripe size", &format!("{stripe_size:?}")]);
table.add_row(["Config", &serde_json::to_string_pretty(&config).unwrap()]);
println!("{table}");
println!("Shards:");
@@ -789,7 +789,7 @@ async fn main() -> anyhow::Result<()> {
let secondary = shard
.node_secondary
.iter()
.map(|n| format!("{}", n))
.map(|n| format!("{n}"))
.collect::<Vec<_>>()
.join(",");
@@ -863,7 +863,7 @@ async fn main() -> anyhow::Result<()> {
}
} else {
// Make it obvious to the user that since they've omitted an AZ, we're clearing it
eprintln!("Clearing preferred AZ for tenant {}", tenant_id);
eprintln!("Clearing preferred AZ for tenant {tenant_id}");
}
// Construct a request that modifies all the tenant's shards
@@ -1134,8 +1134,7 @@ async fn main() -> anyhow::Result<()> {
Err((tenant_shard_id, from, to, error)) => {
failure += 1;
println!(
"Failed to migrate {} from node {} to node {}: {}",
tenant_shard_id, from, to, error
"Failed to migrate {tenant_shard_id} from node {from} to node {to}: {error}"
);
}
}
@@ -1277,8 +1276,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let mut path = format!(
"/v1/tenant/{}/timeline/{}/download_heatmap_layers",
tenant_shard_id, timeline_id,
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
);
if let Some(c) = concurrency {
@@ -1303,8 +1301,7 @@ async fn watch_tenant_shard(
) -> anyhow::Result<()> {
if let Some(until_migrated_to) = until_migrated_to {
println!(
"Waiting for tenant shard {} to be migrated to node {}",
tenant_shard_id, until_migrated_to
"Waiting for tenant shard {tenant_shard_id} to be migrated to node {until_migrated_to}"
);
}
@@ -1327,7 +1324,7 @@ async fn watch_tenant_shard(
"attached: {} secondary: {} {}",
shard
.node_attached
.map(|n| format!("{}", n))
.map(|n| format!("{n}"))
.unwrap_or("none".to_string()),
shard
.node_secondary
@@ -1341,15 +1338,12 @@ async fn watch_tenant_shard(
"(reconciler idle)"
}
);
println!("{}", summary);
println!("{summary}");
// Maybe drop out if we finished migration
if let Some(until_migrated_to) = until_migrated_to {
if shard.node_attached == Some(until_migrated_to) && !shard.is_reconciling {
println!(
"Tenant shard {} is now on node {}",
tenant_shard_id, until_migrated_to
);
println!("Tenant shard {tenant_shard_id} is now on node {until_migrated_to}");
break;
}
}

View File

@@ -374,7 +374,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let request = Request::builder()
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
.method(method)
.header("Authorization", format!("Bearer {}", token))
.header("Authorization", format!("Bearer {token}"))
.body(Body::empty())
.unwrap();
let status = ServiceExt::ready(&mut app)

View File

@@ -71,7 +71,7 @@ impl Runtime {
debug!("thread panicked: {:?}", e);
let mut result = ctx.result.lock();
if result.0 == -1 {
*result = (256, format!("thread panicked: {:?}", e));
*result = (256, format!("thread panicked: {e:?}"));
}
});
}

View File

@@ -47,8 +47,8 @@ impl Debug for AnyMessage {
match self {
AnyMessage::None => write!(f, "None"),
AnyMessage::InternalConnect => write!(f, "InternalConnect"),
AnyMessage::Just32(v) => write!(f, "Just32({})", v),
AnyMessage::ReplCell(v) => write!(f, "ReplCell({:?})", v),
AnyMessage::Just32(v) => write!(f, "Just32({v})"),
AnyMessage::ReplCell(v) => write!(f, "ReplCell({v:?})"),
AnyMessage::Bytes(v) => write!(f, "Bytes({})", hex::encode(v)),
AnyMessage::LSN(v) => write!(f, "LSN({})", Lsn(*v)),
}

View File

@@ -582,14 +582,14 @@ pub fn attach_openapi_ui(
deepLinking: true,
showExtensions: true,
showCommonExtensions: true,
url: "{}",
url: "{spec_mount_path}",
}})
window.ui = ui;
}};
</script>
</body>
</html>
"#, spec_mount_path))).unwrap())
"#))).unwrap())
})
)
}
@@ -696,7 +696,7 @@ mod tests {
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
panic!("request service is not ready: {e:?}");
}
let mut req: Request<Body> = Request::default();
@@ -716,7 +716,7 @@ mod tests {
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
panic!("request service is not ready: {e:?}");
}
let req: Request<Body> = Request::default();

View File

@@ -86,7 +86,7 @@ impl ShmemHandle {
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {} too large", max_size);
panic!("max size {max_size} too large");
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
@@ -279,7 +279,7 @@ mod tests {
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {}", i);
assert_eq!(expected, b, "unexpected byte at offset {i}");
}
}

View File

@@ -577,8 +577,7 @@ mod test {
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
"expect unknown field `unknown_field` error, got: {err}"
);
}

View File

@@ -334,8 +334,7 @@ impl KeySpace {
std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
assert!(
!overlap,
"Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
prev, range
"Attempt to merge ovelapping keyspaces: {prev:?} overlaps {range:?}"
);
}
@@ -1104,7 +1103,7 @@ mod tests {
// total range contains at least one shard-local page
let all_nonzero = fragments.iter().all(|f| f.0 > 0);
if !all_nonzero {
eprintln!("Found a zero-length fragment: {:?}", fragments);
eprintln!("Found a zero-length fragment: {fragments:?}");
}
assert!(all_nonzero);
} else {

View File

@@ -1182,7 +1182,7 @@ impl Display for ImageCompressionAlgorithm {
ImageCompressionAlgorithm::Disabled => write!(f, "disabled"),
ImageCompressionAlgorithm::Zstd { level } => {
if let Some(level) = level {
write!(f, "zstd({})", level)
write!(f, "zstd({level})")
} else {
write!(f, "zstd")
}
@@ -2011,8 +2011,7 @@ mod tests {
let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
"expect unknown field `unknown_field` error, got: {err}"
);
}

View File

@@ -939,7 +939,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackendReader<IO> {
FeMessage::CopyFail => Err(CopyStreamHandlerEnd::CopyFail),
FeMessage::Terminate => Err(CopyStreamHandlerEnd::Terminate),
_ => Err(CopyStreamHandlerEnd::from(ConnectionError::Protocol(
ProtocolError::Protocol(format!("unexpected message in COPY stream {:?}", msg)),
ProtocolError::Protocol(format!("unexpected message in COPY stream {msg:?}")),
))),
},
None => Err(CopyStreamHandlerEnd::EOF),

View File

@@ -61,7 +61,7 @@ async fn simple_select() {
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});
@@ -137,7 +137,7 @@ async fn simple_select_ssl() {
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("connection error: {e}");
}
});

View File

@@ -223,7 +223,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: None }"
);
}
@@ -239,7 +239,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "[::1]:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Ipv6(::1), port: 123, password: None }"
);
}
@@ -252,7 +252,7 @@ mod tests_pg_connection_config {
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
format!("{:?}", cfg),
format!("{cfg:?}"),
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: Some(REDACTED-STRING) }"
);
}

View File

@@ -114,7 +114,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(
|e| WalDecodeError {
msg: format!("long header deserialization failed {}", e),
msg: format!("long header deserialization failed {e}"),
lsn: self.lsn,
},
)?;
@@ -130,7 +130,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let hdr =
XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("header deserialization failed {}", e),
msg: format!("header deserialization failed {e}"),
lsn: self.lsn,
}
})?;
@@ -155,7 +155,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {}", xl_tot_len),
msg: format!("invalid xl_tot_len {xl_tot_len}"),
lsn: self.lsn,
});
}
@@ -218,7 +218,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
msg: format!("xlog record deserialization failed {e}"),
lsn: self.lsn,
}
})?;

View File

@@ -1199,7 +1199,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_HEAP2_MULTI_INSERT => "HEAP2 MULTI_INSERT",
pg_constants::XLOG_HEAP2_VISIBLE => "HEAP2 VISIBLE",
_ => {
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1212,7 +1212,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_HEAP_UPDATE => "HEAP UPDATE",
pg_constants::XLOG_HEAP_HOT_UPDATE => "HEAP HOT_UPDATE",
_ => {
unknown_str = format!("HEAP2 UNKNOWN_0x{:02x}", info);
unknown_str = format!("HEAP2 UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1223,7 +1223,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
pg_constants::XLOG_FPI => "XLOG FPI",
pg_constants::XLOG_FPI_FOR_HINT => "XLOG FPI_FOR_HINT",
_ => {
unknown_str = format!("XLOG UNKNOWN_0x{:02x}", info);
unknown_str = format!("XLOG UNKNOWN_0x{info:02x}");
&unknown_str
}
}
@@ -1231,7 +1231,7 @@ pub fn describe_postgres_wal_record(record: &Bytes) -> Result<String, Deserializ
rmid => {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
unknown_str = format!("UNKNOWN_RM_{} INFO_0x{:02x}", rmid, info);
unknown_str = format!("UNKNOWN_RM_{rmid} INFO_0x{info:02x}");
&unknown_str
}
};

View File

@@ -34,7 +34,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
let cfg = Conf {
pg_version,
pg_distrib_dir: top_path.join("pg_install"),
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
datadir: top_path.join(format!("test_output/{test_name}-{PG_MAJORVERSION}")),
};
if cfg.datadir.exists() {
fs::remove_dir_all(&cfg.datadir).unwrap();

View File

@@ -31,15 +31,15 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Spawn(e) => write!(f, "Error spawning command: {:?}", e),
Error::Spawn(e) => write!(f, "Error spawning command: {e:?}"),
Error::Failed { status, stderr } => write!(
f,
"Command failed with status {:?}: {}",
status,
String::from_utf8_lossy(stderr)
),
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {:?}", e),
Error::Other(e) => write!(f, "Error: {:?}", e),
Error::WaitOutput(e) => write!(f, "Error waiting for command output: {e:?}"),
Error::Other(e) => write!(f, "Error: {e:?}"),
}
}
}

View File

@@ -168,15 +168,13 @@ impl FeatureStore {
let PostHogFlagFilterPropertyValue::String(provided) = provided else {
// Left should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a string: {:?}",
provided
"The left side of the condition is not a string: {provided:?}"
)));
};
let PostHogFlagFilterPropertyValue::List(requested) = requested else {
// Right should be a list of string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a list: {:?}",
requested
"The right side of the condition is not a list: {requested:?}"
)));
};
Ok(requested.contains(provided))
@@ -185,14 +183,12 @@ impl FeatureStore {
let PostHogFlagFilterPropertyValue::String(requested) = requested else {
// Right should be a string
return Err(PostHogEvaluationError::Internal(format!(
"The right side of the condition is not a string: {:?}",
requested
"The right side of the condition is not a string: {requested:?}"
)));
};
let Ok(requested) = requested.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the right side of the condition as a number: {:?}",
requested
"Can not parse the right side of the condition as a number: {requested:?}"
)));
};
// Left can either be a number or a string
@@ -201,16 +197,14 @@ impl FeatureStore {
PostHogFlagFilterPropertyValue::String(provided) => {
let Ok(provided) = provided.parse::<f64>() else {
return Err(PostHogEvaluationError::Internal(format!(
"Can not parse the left side of the condition as a number: {:?}",
provided
"Can not parse the left side of the condition as a number: {provided:?}"
)));
};
provided
}
_ => {
return Err(PostHogEvaluationError::Internal(format!(
"The left side of the condition is not a number or a string: {:?}",
provided
"The left side of the condition is not a number or a string: {provided:?}"
)));
}
};
@@ -218,14 +212,12 @@ impl FeatureStore {
"lt" => Ok(provided < requested),
"gt" => Ok(provided > requested),
op => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
op
"Unsupported operator: {op}"
))),
}
}
_ => Err(PostHogEvaluationError::Internal(format!(
"Unsupported operator: {}",
operator
"Unsupported operator: {operator}"
))),
}
}
@@ -373,8 +365,7 @@ impl FeatureStore {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
"The feature flag is not active: {flag_key}"
)));
}
let Some(ref multivariate) = flag_config.filters.multivariate else {
@@ -401,8 +392,7 @@ impl FeatureStore {
// This should not happen because the rollout percentage always adds up to 100, but just in case that PostHog
// returned invalid spec, we return an error.
return Err(PostHogEvaluationError::Internal(format!(
"Rollout percentage does not add up to 100: {}",
flag_key
"Rollout percentage does not add up to 100: {flag_key}"
)));
}
GroupEvaluationResult::Unmatched => continue,
@@ -413,8 +403,7 @@ impl FeatureStore {
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}
@@ -440,8 +429,7 @@ impl FeatureStore {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
"The feature flag is not active: {flag_key}"
)));
}
if flag_config.filters.multivariate.is_some() {
@@ -456,8 +444,7 @@ impl FeatureStore {
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
GroupEvaluationResult::MatchedAndOverride(_) => {
return Err(PostHogEvaluationError::Internal(format!(
"Boolean flag cannot have overrides: {}",
flag_key
"Boolean flag cannot have overrides: {flag_key}"
)));
}
GroupEvaluationResult::MatchedAndEvaluate => {
@@ -471,8 +458,7 @@ impl FeatureStore {
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}
@@ -483,8 +469,7 @@ impl FeatureStore {
Ok(flag_config.filters.multivariate.is_none())
} else {
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
"Not found in the local evaluation spec: {flag_key}"
)))
}
}

View File

@@ -198,7 +198,7 @@ impl fmt::Display for CancelKeyData {
// This format is more compact and might work better for logs.
f.debug_tuple("CancelKeyData")
.field(&format_args!("{:x}", id))
.field(&format_args!("{id:x}"))
.finish()
}
}
@@ -291,8 +291,7 @@ impl FeMessage {
let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
if len < 4 {
return Err(ProtocolError::Protocol(format!(
"invalid message length {}",
len
"invalid message length {len}"
)));
}
@@ -367,8 +366,7 @@ impl FeStartupPacket {
#[allow(clippy::manual_range_contains)]
if len < 8 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(ProtocolError::Protocol(format!(
"invalid startup packet message length {}",
len
"invalid startup packet message length {len}"
)));
}

View File

@@ -308,7 +308,7 @@ impl ScramSha256 {
let verifier = match parsed {
ServerFinalMessage::Error(e) => {
return Err(io::Error::other(format!("SCRAM error: {}", e)));
return Err(io::Error::other(format!("SCRAM error: {e}")));
}
ServerFinalMessage::Verifier(verifier) => verifier,
};
@@ -343,10 +343,8 @@ impl<'a> Parser<'a> {
match self.it.next() {
Some((_, c)) if c == target => Ok(()),
Some((i, c)) => {
let m = format!(
"unexpected character at byte {}: expected `{}` but got `{}",
i, target, c
);
let m =
format!("unexpected character at byte {i}: expected `{target}` but got `{c}");
Err(io::Error::new(io::ErrorKind::InvalidInput, m))
}
None => Err(io::Error::new(
@@ -412,7 +410,7 @@ impl<'a> Parser<'a> {
match self.it.peek() {
Some(&(i, _)) => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unexpected trailing data at byte {}", i),
format!("unexpected trailing data at byte {i}"),
)),
None => Ok(()),
}

View File

@@ -211,7 +211,7 @@ impl Message {
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown authentication tag `{}`", tag),
format!("unknown authentication tag `{tag}`"),
));
}
},
@@ -238,7 +238,7 @@ impl Message {
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag `{}`", tag),
format!("unknown message tag `{tag}`"),
));
}
};

View File

@@ -46,7 +46,7 @@ impl fmt::Display for Type {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.schema() {
"public" | "pg_catalog" => {}
schema => write!(fmt, "{}.", schema)?,
schema => write!(fmt, "{schema}.")?,
}
fmt.write_str(self.name())
}

View File

@@ -332,10 +332,10 @@ impl fmt::Display for DbError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}: {}", self.severity, self.message)?;
if let Some(detail) = &self.detail {
write!(fmt, "\nDETAIL: {}", detail)?;
write!(fmt, "\nDETAIL: {detail}")?;
}
if let Some(hint) = &self.hint {
write!(fmt, "\nHINT: {}", hint)?;
write!(fmt, "\nHINT: {hint}")?;
}
Ok(())
}
@@ -398,9 +398,9 @@ impl fmt::Display for Error {
Kind::Io => fmt.write_str("error communicating with the server")?,
Kind::UnexpectedMessage => fmt.write_str("unexpected message from server")?,
Kind::Tls => fmt.write_str("error performing TLS handshake")?,
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {}", idx)?,
Kind::FromSql(idx) => write!(fmt, "error deserializing column {}", idx)?,
Kind::Column(column) => write!(fmt, "invalid column `{}`", column)?,
Kind::ToSql(idx) => write!(fmt, "error serializing parameter {idx}")?,
Kind::FromSql(idx) => write!(fmt, "error deserializing column {idx}")?,
Kind::Column(column) => write!(fmt, "invalid column `{column}`")?,
Kind::Closed => fmt.write_str("connection closed")?,
Kind::Db => fmt.write_str("db error")?,
Kind::Parse => fmt.write_str("error parsing response from server")?,
@@ -411,7 +411,7 @@ impl fmt::Display for Error {
Kind::Timeout => fmt.write_str("timeout waiting for server")?,
};
if let Some(ref cause) = self.0.cause {
write!(fmt, ": {}", cause)?;
write!(fmt, ": {cause}")?;
}
Ok(())
}

View File

@@ -156,7 +156,7 @@ impl Row {
{
match self.get_inner(&idx) {
Ok(ok) => ok,
Err(err) => panic!("error retrieving column {}: {}", idx, err),
Err(err) => panic!("error retrieving column {idx}: {err}"),
}
}
@@ -274,7 +274,7 @@ impl SimpleQueryRow {
{
match self.get_inner(&idx) {
Ok(ok) => ok,
Err(err) => panic!("error retrieving column {}: {}", idx, err),
Err(err) => panic!("error retrieving column {idx}: {err}"),
}
}

View File

@@ -400,7 +400,7 @@ impl RemoteStorage for LocalFs {
key
};
let relative_key = format!("{}", relative_key);
let relative_key = format!("{relative_key}");
if relative_key.contains(REMOTE_STORAGE_PREFIX_SEPARATOR) {
let first_part = relative_key
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
@@ -594,13 +594,9 @@ impl RemoteStorage for LocalFs {
let from_path = from.with_base(&self.storage_root);
let to_path = to.with_base(&self.storage_root);
create_target_directory(&to_path).await?;
fs::copy(&from_path, &to_path).await.with_context(|| {
format!(
"Failed to copy file from '{from_path}' to '{to_path}'",
from_path = from_path,
to_path = to_path
)
})?;
fs::copy(&from_path, &to_path)
.await
.with_context(|| format!("Failed to copy file from '{from_path}' to '{to_path}'"))?;
Ok(())
}
@@ -1183,7 +1179,7 @@ mod fs_tests {
.write(true)
.create_new(true)
.open(path)?;
write!(file_for_writing, "{}", contents)?;
write!(file_for_writing, "{contents}")?;
drop(file_for_writing);
let file_size = path.metadata()?.len() as usize;
Ok((

View File

@@ -193,10 +193,10 @@ mod tests {
})
.unwrap();
println!("members: {}", members);
println!("members: {members}");
let j = serde_json::to_string(&members).expect("failed to serialize");
println!("members json: {}", j);
println!("members json: {j}");
assert_eq!(
j,
r#"[{"id":42,"host":"lala.org","pg_port":5432},{"id":43,"host":"bubu.org","pg_port":5432}]"#

View File

@@ -41,7 +41,7 @@ pub fn report_compact_sources<E: std::error::Error>(e: &E) -> impl std::fmt::Dis
// why is E a generic parameter here? hope that rustc will see through a default
// Error::source implementation and leave the following out if there cannot be any
// sources:
Sources(self.0.source()).try_for_each(|src| write!(f, ": {}", src))
Sources(self.0.source()).try_for_each(|src| write!(f, ": {src}"))
}
}

View File

@@ -135,7 +135,7 @@ impl Debug for Generation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Valid(v) => {
write!(f, "{:08x}", v)
write!(f, "{v:08x}")
}
Self::None => {
write!(f, "<none>")

View File

@@ -280,7 +280,7 @@ impl TryFrom<Option<&str>> for TimelineId {
value
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| format!("Could not parse timeline id from {:?}", value))
.with_context(|| format!("Could not parse timeline id from {value:?}"))
}
}

View File

@@ -89,7 +89,7 @@ pub fn wal_stream_connection_config(
.set_password(args.auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = args.availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
connstr = connstr.extend_options([format!("availability_zone={availability_zone}")]);
}
Ok(connstr)

View File

@@ -196,7 +196,7 @@ impl std::fmt::Display for TenantShardId {
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
write!(f, "{self}")
}
}
@@ -284,7 +284,7 @@ impl std::fmt::Display for ShardIndex {
impl std::fmt::Debug for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
write!(f, "{self}")
}
}

View File

@@ -29,7 +29,7 @@ impl ShutdownSignals {
SIGINT => Signal::Interrupt,
SIGTERM => Signal::Terminate,
SIGQUIT => Signal::Quit,
other => panic!("unknown signal: {}", other),
other => panic!("unknown signal: {other}"),
};
handler(signal)?;

View File

@@ -90,8 +90,7 @@ impl Dispatcher {
Err(e) => {
sink.send(Message::Text(Utf8Bytes::from(
serde_json::to_string(&ProtocolResponse::Error(format!(
"Received protocol version range {} which does not overlap with {}",
agent_range, monitor_range
"Received protocol version range {agent_range} which does not overlap with {monitor_range}"
)))
.unwrap(),
)))

View File

@@ -285,7 +285,7 @@ impl FileCacheState {
// why we're constructing the query here.
self.client
.query(
&format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb),
&format!("ALTER SYSTEM SET neon.file_cache_size_limit = {num_mb};"),
&[],
)
.await

View File

@@ -64,7 +64,7 @@ async fn download_bench_data(
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into()?;
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent)?;
eprintln!("Downloading benchmark data to {:?}", temp_dir);
eprintln!("Downloading benchmark data to {temp_dir:?}");
let listing = client
.list(None, ListingMode::NoDelimiter, None, cancel)
@@ -120,7 +120,7 @@ struct BenchmarkMetadata {
}
async fn load_bench_data(path: &Utf8Path, input_size: usize) -> anyhow::Result<BenchmarkData> {
eprintln!("Loading benchmark data from {:?}", path);
eprintln!("Loading benchmark data from {path:?}");
let mut entries = tokio::fs::read_dir(path).await?;
let mut ordered_segment_paths = Vec::new();

View File

@@ -6,6 +6,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// the build then. Anyway, per cargo docs build script shouldn't output to
// anywhere but $OUT_DIR.
tonic_build::compile_protos("proto/interpreted_wal.proto")
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
.unwrap_or_else(|e| panic!("failed to compile protos {e:?}"));
Ok(())
}

View File

@@ -128,6 +128,6 @@ pub fn describe_wal_record(rec: &NeonWalRecord) -> Result<String, DeserializeErr
will_init,
describe_postgres_wal_record(rec)?
)),
_ => Ok(format!("{:?}", rec)),
_ => Ok(format!("{rec:?}")),
}
}

View File

@@ -376,7 +376,7 @@ impl Level {
FATAL => Level::Fatal,
PANIC => Level::Panic,
WPEVENT => Level::WPEvent,
_ => panic!("unknown log level {}", elevel),
_ => panic!("unknown log level {elevel}"),
}
}
}
@@ -446,7 +446,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
impl std::fmt::Display for Level {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}

View File

@@ -380,7 +380,7 @@ mod tests {
}
fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
println!("conn_send_query: {}", query);
println!("conn_send_query: {query}");
true
}
@@ -399,13 +399,13 @@ mod tests {
) -> crate::bindings::PGAsyncReadResult {
println!("conn_async_read");
let reply = self.next_safekeeper_reply();
println!("conn_async_read result: {:?}", reply);
println!("conn_async_read result: {reply:?}");
vec.extend_from_slice(reply);
crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
}
fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
println!("conn_blocking_write: {:?}", buf);
println!("conn_blocking_write: {buf:?}");
self.check_walproposer_msg(buf);
true
}
@@ -456,10 +456,7 @@ mod tests {
timeout_millis: i64,
) -> super::WaitResult {
let data = self.wait_events.get();
println!(
"wait_event_set, timeout_millis={}, res={:?}",
timeout_millis, data
);
println!("wait_event_set, timeout_millis={timeout_millis}, res={data:?}");
super::WaitResult::Network(data.sk, data.event_mask)
}
@@ -475,7 +472,7 @@ mod tests {
}
fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
println!("wp_log[{}] {}", level, msg);
println!("wp_log[{level}] {msg}");
}
fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {

View File

@@ -508,11 +508,11 @@ impl Client {
.expect("Cannot build URL");
path.query_pairs_mut()
.append_pair("recurse", &format!("{}", recurse));
.append_pair("recurse", &format!("{recurse}"));
if let Some(concurrency) = concurrency {
path.query_pairs_mut()
.append_pair("concurrency", &format!("{}", concurrency));
.append_pair("concurrency", &format!("{concurrency}"));
}
self.request(Method::POST, path, ()).await.map(|_| ())

View File

@@ -152,7 +152,7 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
let key_diff = key_end - key_start;
if key_start >= key_end {
panic!("Invalid key range {}-{}", key_start, key_end);
panic!("Invalid key range {key_start}-{key_end}");
}
let lsn_start = lsn_map.map(f.lsn_range.start);
@@ -212,12 +212,12 @@ pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output:
)?;
writeln!(svg, "</line>")?;
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
Ordering::Greater => panic!("Invalid lsn range {lsn_start}-{lsn_end}"),
}
files_seen.insert(f);
}
writeln!(svg, "{}", EndSvg)?;
writeln!(svg, "{EndSvg}")?;
let mut layer_events_str = String::new();
let mut first = true;

View File

@@ -228,7 +228,7 @@ pub fn main() -> Result<()> {
let lsn_max = lsn_map.len();
if key_start >= key_end {
panic!("Invalid key range {}-{}", key_start, key_end);
panic!("Invalid key range {key_start}-{key_end}");
}
let lsn_start = *lsn_map.get(&lsnr.start).unwrap();
@@ -250,7 +250,7 @@ pub fn main() -> Result<()> {
ymargin = 0.05;
fill = Fill::Color(rgb(0, 0, 0));
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
Ordering::Greater => panic!("Invalid lsn range {lsn_start}-{lsn_end}"),
}
println!(
@@ -287,10 +287,10 @@ pub fn main() -> Result<()> {
);
}
println!("{}", EndSvg);
println!("{EndSvg}");
eprintln!("num_images: {}", num_images);
eprintln!("num_deltas: {}", num_deltas);
eprintln!("num_images: {num_images}");
eprintln!("num_deltas: {num_deltas}");
Ok(())
}

View File

@@ -372,7 +372,7 @@ impl<const N: usize> std::fmt::Debug for RelTagish<N> {
f.write_char('/')?;
}
first = false;
write!(f, "{}", x)
write!(f, "{x}")
})
}
}

View File

@@ -224,8 +224,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
}
}
println!(
"Total delta layers {} image layers {} excess layers {}",
total_delta_layers, total_image_layers, total_excess_layers
"Total delta layers {total_delta_layers} image layers {total_image_layers} excess layers {total_excess_layers}"
);
Ok(())
}

View File

@@ -131,7 +131,7 @@ impl Client {
let domain_stream = response_stream.map(|chunk_res| {
chunk_res.and_then(|proto_chunk| {
proto_chunk.try_into().map_err(|e| {
tonic::Status::internal(format!("Failed to convert response chunk: {}", e))
tonic::Status::internal(format!("Failed to convert response chunk: {e}"))
})
})
});

View File

@@ -62,7 +62,7 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
let timeline_id = timeline.timeline_id;
println!("operating on timeline {}", timeline);
println!("operating on timeline {timeline}");
mgmt_api_client
.set_tenant_config(&TenantConfigRequest {
@@ -75,8 +75,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
let items = (0..100)
.map(|id| {
(
format!("pg_logical/mappings/{:03}.{:03}", batch, id),
format!("{:08}", id),
format!("pg_logical/mappings/{batch:03}.{id:03}"),
format!("{id:08}"),
)
})
.collect::<HashMap<_, _>>();

View File

@@ -669,7 +669,7 @@ where
}
// Append dir path for each database
let path = format!("base/{}", dbnode);
let path = format!("base/{dbnode}");
let header = new_tar_header_dir(&path)?;
self.ar
.append(&header, io::empty())
@@ -677,7 +677,7 @@ where
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
let dst_path = format!("base/{dbnode}/PG_VERSION");
let pg_version_str = match self.timeline.pg_version {
14 | 15 => self.timeline.pg_version.to_string(),
@@ -689,7 +689,7 @@ where
.await
.map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let relmap_path = format!("base/{dbnode}/pg_filenode.map");
let header = new_tar_header(&relmap_path, img.len() as u64)?;
self.ar
.append(&header, &img[..])
@@ -714,9 +714,9 @@ where
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = if self.timeline.pg_version < 17 {
format!("pg_twophase/{:>08X}", xid)
format!("pg_twophase/{xid:>08X}")
} else {
format!("pg_twophase/{:>016X}", xid)
format!("pg_twophase/{xid:>016X}")
};
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar
@@ -768,7 +768,7 @@ where
//send wal segment
let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let wal_file_path = format!("pg_wal/{wal_file_name}");
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg = postgres_ffi::generate_wal_segment(

View File

@@ -287,11 +287,11 @@ impl From<GetActiveTenantError> for ApiError {
GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
ApiError::ShuttingDown
}
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{e}")),
GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
GetActiveTenantError::NotFound(gte) => gte.into(),
GetActiveTenantError::WaitForActiveTimeout { .. } => {
ApiError::ResourceUnavailable(format!("{}", e).into())
ApiError::ResourceUnavailable(format!("{e}").into())
}
GetActiveTenantError::SwitchedTenant => {
// in our HTTP handlers, this error doesn't happen
@@ -1015,7 +1015,7 @@ async fn get_lsn_by_timestamp_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let timestamp_raw = must_get_query_param(&request, "timestamp")?;
let timestamp = humantime::parse_rfc3339(&timestamp_raw)
.with_context(|| format!("Invalid time: {:?}", timestamp_raw))
.with_context(|| format!("Invalid time: {timestamp_raw:?}"))
.map_err(ApiError::BadRequest)?;
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
@@ -1110,7 +1110,7 @@ async fn get_timestamp_of_lsn_handler(
json_response(StatusCode::OK, time)
}
None => Err(ApiError::PreconditionFailed(
format!("Timestamp for lsn {} not found", lsn).into(),
format!("Timestamp for lsn {lsn} not found").into(),
)),
}
}
@@ -2421,7 +2421,7 @@ async fn timeline_offload_handler(
}
if let (false, reason) = timeline.can_offload() {
return Err(ApiError::PreconditionFailed(
format!("Timeline::can_offload() check failed: {}", reason) .into(),
format!("Timeline::can_offload() check failed: {reason}") .into(),
));
}
offload_timeline(&tenant, &timeline)

View File

@@ -1727,12 +1727,7 @@ impl Drop for SmgrOpTimer {
impl SmgrOpFlushInProgress {
/// The caller must guarantee that `socket_fd`` outlives this function.
pub(crate) async fn measure<Fut, O>(
self,
started_at: Instant,
mut fut: Fut,
socket_fd: RawFd,
) -> O
pub(crate) async fn measure<Fut, O>(self, started_at: Instant, fut: Fut, socket_fd: RawFd) -> O
where
Fut: std::future::Future<Output = O>,
{

View File

@@ -392,16 +392,14 @@ async fn page_service_conn_main(
} else {
let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
Err(io_error).context(format!(
"Postgres connection error for tenant_id={:?} client at peer_addr={}",
tenant_id, peer_addr
"Postgres connection error for tenant_id={tenant_id:?} client at peer_addr={peer_addr}"
))
}
}
other => {
let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
other.context(format!(
"Postgres query error for tenant_id={:?} client peer_addr={}",
tenant_id, peer_addr
"Postgres query error for tenant_id={tenant_id:?} client peer_addr={peer_addr}"
))
}
}
@@ -2140,8 +2138,7 @@ impl PageServerHandler {
if request_lsn < not_modified_since {
return Err(PageStreamError::BadRequest(
format!(
"invalid request with request LSN {} and not_modified_since {}",
request_lsn, not_modified_since,
"invalid request with request LSN {request_lsn} and not_modified_since {not_modified_since}",
)
.into(),
));

View File

@@ -1185,7 +1185,7 @@ impl Timeline {
}
let origin_id = k.field6 as RepOriginId;
let origin_lsn = Lsn::des(&v)
.with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
.with_context(|| format!("decode replorigin value for {origin_id}: {v:?}"))?;
if origin_lsn != Lsn::INVALID {
result.insert(origin_id, origin_lsn);
}
@@ -2440,8 +2440,7 @@ impl DatadirModification<'_> {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
"duplicated entries found for {path}"
);
modifying_file = Some(content);
} else {

View File

@@ -3449,7 +3449,7 @@ impl TenantShard {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {current_state:?}");
}
TenantState::Attaching => {
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
@@ -6616,7 +6616,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -6626,7 +6626,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -6640,7 +6640,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -6650,7 +6650,7 @@ mod tests {
.put(
*TEST_KEY,
lsn,
&Value::Image(test_img(&format!("foo at {}", lsn))),
&Value::Image(test_img(&format!("foo at {lsn}"))),
ctx,
)
.await?;
@@ -7149,7 +7149,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
ctx,
)
.await?;
@@ -7437,7 +7437,7 @@ mod tests {
.put(
gap_at_key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", gap_at_key, current_lsn))),
&Value::Image(test_img(&format!("{gap_at_key} at {current_lsn}"))),
&ctx,
)
.await?;
@@ -7476,7 +7476,7 @@ mod tests {
.put(
current_key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", current_key, current_lsn))),
&Value::Image(test_img(&format!("{current_key} at {current_lsn}"))),
&ctx,
)
.await?;
@@ -7584,7 +7584,7 @@ mod tests {
while key < end_key {
current_lsn += 0x10;
let image_value = format!("{} at {}", child_gap_at_key, current_lsn);
let image_value = format!("{child_gap_at_key} at {current_lsn}");
let mut writer = parent_timeline.writer().await;
writer
@@ -7627,7 +7627,7 @@ mod tests {
.put(
key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", key, current_lsn))),
&Value::Image(test_img(&format!("{key} at {current_lsn}"))),
&ctx,
)
.await?;
@@ -7748,7 +7748,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -7769,7 +7769,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -7783,7 +7783,7 @@ mod tests {
test_key.field6 = blknum as u32;
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", blknum, last_lsn))
test_img(&format!("{blknum} at {last_lsn}"))
);
}
@@ -7829,7 +7829,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -7858,11 +7858,11 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
println!("updating {} at {}", blknum, lsn);
println!("updating {blknum} at {lsn}");
writer.finish_write(lsn);
drop(writer);
updated[blknum] = lsn;
@@ -7873,7 +7873,7 @@ mod tests {
test_key.field6 = blknum as u32;
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", blknum, last_lsn))
test_img(&format!("{blknum} at {last_lsn}"))
);
}
@@ -7926,11 +7926,11 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} {} at {}", idx, blknum, lsn))),
&Value::Image(test_img(&format!("{idx} {blknum} at {lsn}"))),
&ctx,
)
.await?;
println!("updating [{}][{}] at {}", idx, blknum, lsn);
println!("updating [{idx}][{blknum}] at {lsn}");
writer.finish_write(lsn);
drop(writer);
updated[idx][blknum] = lsn;
@@ -8136,7 +8136,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -8153,7 +8153,7 @@ mod tests {
test_key.field6 = (blknum * STEP) as u32;
assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", blknum, last_lsn))
test_img(&format!("{blknum} at {last_lsn}"))
);
}
@@ -8190,7 +8190,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -8443,7 +8443,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -8463,7 +8463,7 @@ mod tests {
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&Value::Image(test_img(&format!("{blknum} at {lsn}"))),
&ctx,
)
.await?;
@@ -9384,12 +9384,7 @@ mod tests {
let end_lsn = Lsn(0x100);
let image_layers = (0x20..=0x90)
.step_by(0x10)
.map(|n| {
(
Lsn(n),
vec![(key, test_img(&format!("data key at {:x}", n)))],
)
})
.map(|n| (Lsn(n), vec![(key, test_img(&format!("data key at {n:x}")))]))
.collect();
let timeline = tenant

View File

@@ -63,8 +63,7 @@ pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
&& overlaps_with(&layer.key_range, &other_layer.key_range)
{
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
layer, other_layer
"layer violates the layer map LSN split assumption: layer {layer} intersects with layer {other_layer}"
);
return Some(err);
}

View File

@@ -550,8 +550,7 @@ mod tests {
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
"Metadata of the old version {METADATA_OLD_FORMAT_VERSION} should be upgraded to the latest version {METADATA_FORMAT_VERSION}"
);
}

View File

@@ -1427,7 +1427,7 @@ async fn init_timeline_state(
let local_meta = dentry
.metadata()
.await
.fatal_err(&format!("Read metadata on {}", file_path));
.fatal_err(&format!("Read metadata on {file_path}"));
let file_name = file_path.file_name().expect("created it from the dentry");
if crate::is_temporary(&file_path)

View File

@@ -783,7 +783,7 @@ impl DeltaLayer {
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
.with_context(|| format!("Failed to open file '{path}'"))?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
@@ -1401,7 +1401,7 @@ impl DeltaLayerInner {
match val {
Value::Image(img) => {
let checkpoint = CheckPoint::decode(&img)?;
println!(" CHECKPOINT: {:?}", checkpoint);
println!(" CHECKPOINT: {checkpoint:?}");
}
Value::WalRecord(_rec) => {
println!(" unexpected walrecord value for checkpoint key");

View File

@@ -272,8 +272,7 @@ impl ImageLayer {
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
"{fname}.{filename_disambiguator:x}.{TEMP_FILE_SUFFIX}"
))
}
@@ -370,7 +369,7 @@ impl ImageLayer {
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
.with_context(|| format!("Failed to open file '{path}'"))?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
@@ -1475,7 +1474,7 @@ mod test {
assert_eq!(l1, expect_lsn);
assert_eq!(&i1, i2);
}
(o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
(o1, o2) => panic!("iterators length mismatch: {o1:?}, {o2:?}"),
}
}
}

View File

@@ -511,7 +511,7 @@ fn inmem_layer_log_display(
start_lsn: Lsn,
end_lsn: Lsn,
) -> std::fmt::Result {
write!(f, "timeline {} in-memory ", timeline)?;
write!(f, "timeline {timeline} in-memory ")?;
inmem_layer_display(f, start_lsn, end_lsn)
}

View File

@@ -380,7 +380,7 @@ impl<B: Buffer> std::fmt::Debug for LogicalReadState<B> {
write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer))
}
LogicalReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)),
LogicalReadState::Error(e) => write!(f, "Error({:?})", e),
LogicalReadState::Error(e) => write!(f, "Error({e:?})"),
LogicalReadState::Undefined => write!(f, "Undefined"),
}
}

View File

@@ -105,7 +105,7 @@ impl std::fmt::Display for Layer {
impl std::fmt::Debug for Layer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
write!(f, "{self}")
}
}

View File

@@ -178,7 +178,7 @@ pub enum LastImageLayerCreationStatus {
impl std::fmt::Display for ImageLayerCreationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}
@@ -632,7 +632,7 @@ pub enum ReadPathLayerId {
impl std::fmt::Display for ReadPathLayerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReadPathLayerId::PersistentLayer(key) => write!(f, "{}", key),
ReadPathLayerId::PersistentLayer(key) => write!(f, "{key}"),
ReadPathLayerId::InMemoryLayer(range) => {
write!(f, "in-mem {}..{}", range.start, range.end)
}
@@ -708,7 +708,7 @@ impl MissingKeyError {
impl std::fmt::Debug for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
write!(f, "{self}")
}
}
@@ -721,19 +721,19 @@ impl std::fmt::Display for MissingKeyError {
)?;
if let Some(ref ancestor_lsn) = self.ancestor_lsn {
write!(f, ", ancestor {}", ancestor_lsn)?;
write!(f, ", ancestor {ancestor_lsn}")?;
}
if let Some(ref query) = self.query {
write!(f, ", query {}", query)?;
write!(f, ", query {query}")?;
}
if let Some(ref read_path) = self.read_path {
write!(f, "\n{}", read_path)?;
write!(f, "\n{read_path}")?;
}
if let Some(ref backtrace) = self.backtrace {
write!(f, "\n{}", backtrace)?;
write!(f, "\n{backtrace}")?;
}
Ok(())
@@ -7179,9 +7179,7 @@ impl Timeline {
if let Some(end) = layer_end_lsn {
assert!(
end <= last_record_lsn,
"advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}",
end,
last_record_lsn,
"advance last record lsn before inserting a layer, end_lsn={end}, last_record_lsn={last_record_lsn}",
);
}

View File

@@ -977,7 +977,7 @@ impl KeyHistoryRetention {
tline
.reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
.await
.with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
.with_context(|| format!("verification failed for key {key} at lsn {lsn}"))?;
Ok(())
}
@@ -2647,15 +2647,15 @@ impl Timeline {
use std::fmt::Write;
let mut output = String::new();
if let Some((key, _, _)) = replay_history.first() {
write!(output, "key={} ", key).unwrap();
write!(output, "key={key} ").unwrap();
let mut cnt = 0;
for (_, lsn, val) in replay_history {
if val.is_image() {
write!(output, "i@{} ", lsn).unwrap();
write!(output, "i@{lsn} ").unwrap();
} else if val.will_init() {
write!(output, "di@{} ", lsn).unwrap();
write!(output, "di@{lsn} ").unwrap();
} else {
write!(output, "d@{} ", lsn).unwrap();
write!(output, "d@{lsn} ").unwrap();
}
cnt += 1;
if cnt >= 128 {

View File

@@ -360,8 +360,7 @@ pub(super) async fn handle_walreceiver_connection(
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn})"
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));

View File

@@ -68,16 +68,9 @@ impl<A: Alignment> AlignedBuffer<A> {
assert!(
begin <= end,
"range start must not be greater than end: {:?} <= {:?}",
begin,
end,
);
assert!(
end <= len,
"range end out of bounds: {:?} <= {:?}",
end,
len,
"range start must not be greater than end: {begin:?} <= {end:?}",
);
assert!(end <= len, "range end out of bounds: {end:?} <= {len:?}",);
let begin = self.range.start + begin;
let end = self.range.start + end;

View File

@@ -242,10 +242,7 @@ unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
/// Panic with a nice error message.
#[cold]
fn panic_advance(idx: usize, len: usize) -> ! {
panic!(
"advance out of bounds: the len is {} but advancing by {}",
len, idx
);
panic!("advance out of bounds: the len is {len} but advancing by {idx}");
}
/// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,

View File

@@ -2108,7 +2108,7 @@ mod tests {
// Check relation content
for blkno in 0..relsize {
let lsn = Lsn(0x20);
let data = format!("foo blk {} at {}", blkno, lsn);
let data = format!("foo blk {blkno} at {lsn}");
assert_eq!(
tline
.get_rel_page_at_lsn(
@@ -2142,7 +2142,7 @@ mod tests {
for blkno in 0..1 {
let lsn = Lsn(0x20);
let data = format!("foo blk {} at {}", blkno, lsn);
let data = format!("foo blk {blkno} at {lsn}");
assert_eq!(
tline
.get_rel_page_at_lsn(
@@ -2167,7 +2167,7 @@ mod tests {
);
for blkno in 0..relsize {
let lsn = Lsn(0x20);
let data = format!("foo blk {} at {}", blkno, lsn);
let data = format!("foo blk {blkno} at {lsn}");
assert_eq!(
tline
.get_rel_page_at_lsn(
@@ -2188,7 +2188,7 @@ mod tests {
let lsn = Lsn(0x80);
let mut m = tline.begin_modification(lsn);
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, lsn);
let data = format!("foo blk {blkno} at {lsn}");
walingest
.put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
.await?;
@@ -2210,7 +2210,7 @@ mod tests {
// Check relation content
for blkno in 0..relsize {
let lsn = Lsn(0x80);
let data = format!("foo blk {} at {}", blkno, lsn);
let data = format!("foo blk {blkno} at {lsn}");
assert_eq!(
tline
.get_rel_page_at_lsn(
@@ -2414,6 +2414,6 @@ mod tests {
}
let duration = started_at.elapsed();
println!("done in {:?}", duration);
println!("done in {duration:?}");
}
}

View File

@@ -52,8 +52,7 @@ pub(crate) fn apply_in_neon(
let (rel, _) = key.to_rel_block().context("invalid record")?;
assert!(
rel.forknum == VISIBILITYMAP_FORKNUM,
"TruncateVisibilityMap record on unexpected rel {}",
rel
"TruncateVisibilityMap record on unexpected rel {rel}"
);
let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
map[*trunc_byte + 1..].fill(0u8);
@@ -78,8 +77,7 @@ pub(crate) fn apply_in_neon(
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
assert!(
rel.forknum == VISIBILITYMAP_FORKNUM,
"ClearVisibilityMapFlags record on unexpected rel {}",
rel
"ClearVisibilityMapFlags record on unexpected rel {rel}"
);
if let Some(heap_blkno) = *new_heap_blkno {
// Calculate the VM block and offset that corresponds to the heap block.
@@ -124,8 +122,7 @@ pub(crate) fn apply_in_neon(
assert_eq!(
slru_kind,
SlruKind::Clog,
"ClogSetCommitted record with unexpected key {}",
key
"ClogSetCommitted record with unexpected key {key}"
);
for &xid in xids {
let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
@@ -135,15 +132,11 @@ pub(crate) fn apply_in_neon(
// Check that we're modifying the correct CLOG block.
assert!(
segno == expected_segno,
"ClogSetCommitted record for XID {} with unexpected key {}",
xid,
key
"ClogSetCommitted record for XID {xid} with unexpected key {key}"
);
assert!(
blknum == expected_blknum,
"ClogSetCommitted record for XID {} with unexpected key {}",
xid,
key
"ClogSetCommitted record for XID {xid} with unexpected key {key}"
);
transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
@@ -169,8 +162,7 @@ pub(crate) fn apply_in_neon(
assert_eq!(
slru_kind,
SlruKind::Clog,
"ClogSetAborted record with unexpected key {}",
key
"ClogSetAborted record with unexpected key {key}"
);
for &xid in xids {
let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
@@ -180,15 +172,11 @@ pub(crate) fn apply_in_neon(
// Check that we're modifying the correct CLOG block.
assert!(
segno == expected_segno,
"ClogSetAborted record for XID {} with unexpected key {}",
xid,
key
"ClogSetAborted record for XID {xid} with unexpected key {key}"
);
assert!(
blknum == expected_blknum,
"ClogSetAborted record for XID {} with unexpected key {}",
xid,
key
"ClogSetAborted record for XID {xid} with unexpected key {key}"
);
transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
@@ -199,8 +187,7 @@ pub(crate) fn apply_in_neon(
assert_eq!(
slru_kind,
SlruKind::MultiXactOffsets,
"MultixactOffsetCreate record with unexpected key {}",
key
"MultixactOffsetCreate record with unexpected key {key}"
);
// Compute the block and offset to modify.
// See RecordNewMultiXact in PostgreSQL sources.
@@ -213,15 +200,11 @@ pub(crate) fn apply_in_neon(
let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
assert!(
segno == expected_segno,
"MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
mid,
key
"MultiXactOffsetsCreate record for multi-xid {mid} with unexpected key {key}"
);
assert!(
blknum == expected_blknum,
"MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
mid,
key
"MultiXactOffsetsCreate record for multi-xid {mid} with unexpected key {key}"
);
LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
@@ -231,8 +214,7 @@ pub(crate) fn apply_in_neon(
assert_eq!(
slru_kind,
SlruKind::MultiXactMembers,
"MultixactMembersCreate record with unexpected key {}",
key
"MultixactMembersCreate record with unexpected key {key}"
);
for (i, member) in members.iter().enumerate() {
let offset = moff + i as u32;
@@ -249,15 +231,11 @@ pub(crate) fn apply_in_neon(
let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
assert!(
segno == expected_segno,
"MultiXactMembersCreate record for offset {} with unexpected key {}",
moff,
key
"MultiXactMembersCreate record for offset {moff} with unexpected key {key}"
);
assert!(
blknum == expected_blknum,
"MultiXactMembersCreate record for offset {} with unexpected key {}",
moff,
key
"MultiXactMembersCreate record for offset {moff} with unexpected key {key}"
);
let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);

View File

@@ -159,18 +159,14 @@ static XLogReaderState *reader_state;
* If the Linux uAPI headers don't define the system call number,
* fail the build deliberately rather than ifdef'ing it to ENOSYS.
* We prefer a compile time over a runtime error for walredo.
*
* If, however, we need to build on old systems for development, e.g. Ubuntu 20.04
* with glibc 2.31, provide a NO_CLOSE_RANGE macro for suboptimal implementation
*/
#include <unistd.h>
#include <sys/syscall.h>
#include <errno.h>
static int
close_range_syscall(unsigned int start_fd, unsigned int count, unsigned int flags)
{
return syscall(__NR_close_range, start_fd, count, flags);
}
static PgSeccompRule allowed_syscalls[] =
{
/* Hard requirements */
@@ -213,10 +209,30 @@ enter_seccomp_mode(void)
* it potentially leaked to us, _before_ we start processing potentially dangerous
* wal records. See the comment in the Rust code that launches this process.
*/
if (close_range_syscall(3, ~0U, 0) != 0)
#define START_FD 3
#define STR(s) #s
#ifdef __NR_close_range
if (syscall(__NR_close_range, START_FD, ~0U, 0) != 0)
ereport(FATAL,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("seccomp: could not close files >= fd 3")));
errmsg("seccomp: could not close files >= " STR(START_FD))));
#else
// close_range can return EINVAL -- not our case as start_fd and end_fd are hardcoded.
// It doesn't return any other errors if CLOSE_RANGE_UNSHARE is not set so we don't
// report any errors here.
#ifdef NO_CLOSE_RANGE
#warning
"__NR_close_range is not defined which means you're using kernel < 5.9."
"Using suboptimal implementation. Do NOT use this in production"
int fd;
for (fd = START_FD; fd <= INT_MAX; ++fd)
close(fd);
#else
#error
"__NR_close_range is not defined which means you're using kernel < 5.9."
"Define NO_CLOSE_RANGE for local development"
#endif
#endif
#ifdef MALLOC_NO_MMAP
/* Ask glibc not to use mmap() */

View File

@@ -61,6 +61,10 @@
clippy::too_many_lines,
clippy::unused_self
)]
#![allow(
clippy::unsafe_derive_deserialize,
reason = "false positive: https://github.com/rust-lang/rust-clippy/issues/15120"
)]
#![cfg_attr(
any(test, feature = "testing"),
allow(

View File

@@ -206,16 +206,10 @@ impl Storage for FileStorage {
let buf: Vec<u8> = s.write_to_buf()?;
control_partial.write_all(&buf).await.with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
control_partial_path
)
format!("failed to write safekeeper state into control file at: {control_partial_path}")
})?;
control_partial.flush().await.with_context(|| {
format!(
"failed to flush safekeeper state into control file at: {}",
control_partial_path
)
format!("failed to flush safekeeper state into control file at: {control_partial_path}")
})?;
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);

View File

@@ -73,7 +73,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
let re = Regex::new(r"START_WAL_PUSH(\s+?\((.*)\))?").unwrap();
let caps = re
.captures(cmd)
.context(format!("failed to parse START_WAL_PUSH command {}", cmd))?;
.context(format!("failed to parse START_WAL_PUSH command {cmd}"))?;
// capture () content
let options = caps.get(2).map(|m| m.as_str()).unwrap_or("");
// default values
@@ -85,24 +85,20 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
}
let mut kvit = kvstr.split_whitespace();
let key = kvit.next().context(format!(
"failed to parse key in kv {} in command {}",
kvstr, cmd
"failed to parse key in kv {kvstr} in command {cmd}"
))?;
let value = kvit.next().context(format!(
"failed to parse value in kv {} in command {}",
kvstr, cmd
"failed to parse value in kv {kvstr} in command {cmd}"
))?;
let value_trimmed = value.trim_matches('\'');
if key == "proto_version" {
proto_version = value_trimmed.parse::<u32>().context(format!(
"failed to parse proto_version value {} in command {}",
value, cmd
"failed to parse proto_version value {value} in command {cmd}"
))?;
}
if key == "allow_timeline_creation" {
allow_timeline_creation = value_trimmed.parse::<bool>().context(format!(
"failed to parse allow_timeline_creation value {} in command {}",
value, cmd
"failed to parse allow_timeline_creation value {value} in command {cmd}"
))?;
}
}
@@ -118,7 +114,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
.unwrap();
let caps = re
.captures(cmd)
.context(format!("failed to parse START_REPLICATION command {}", cmd))?;
.context(format!("failed to parse START_REPLICATION command {cmd}"))?;
let start_lsn =
Lsn::from_str(&caps[1]).context("parse start LSN from START_REPLICATION command")?;
let term = if let Some(m) = caps.get(2) {

View File

@@ -64,10 +64,10 @@ impl TermHistory {
for i in 0..n_entries {
let term = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses term", i))?;
.with_context(|| format!("TermHistory pos {i} misses term"))?;
let lsn = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses lsn", i))?
.with_context(|| format!("TermHistory pos {i} misses lsn"))?
.into();
res.push(TermLsn { term, lsn })
}
@@ -121,9 +121,7 @@ impl TermHistory {
if let Some(sk_th_last) = sk_th.last() {
assert!(
sk_th_last.lsn <= sk_wal_end,
"safekeeper term history end {:?} LSN is higher than WAL end {:?}",
sk_th_last,
sk_wal_end
"safekeeper term history end {sk_th_last:?} LSN is higher than WAL end {sk_wal_end:?}"
);
}
@@ -438,11 +436,11 @@ impl ProposerAcceptorMessage {
for i in 0..members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading member {} node_id", i))?;
let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
.with_context(|| format!("reading member {i} node_id"))?;
let host = Self::get_cstr(buf).with_context(|| format!("reading member {i} host"))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading member {} port", i))?;
.with_context(|| format!("reading member {i} port"))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
@@ -463,12 +461,12 @@ impl ProposerAcceptorMessage {
for i in 0..new_members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading new member {} node_id", i))?;
let host = Self::get_cstr(buf)
.with_context(|| format!("reading new member {} host", i))?;
.with_context(|| format!("reading new member {i} node_id"))?;
let host =
Self::get_cstr(buf).with_context(|| format!("reading new member {i} host"))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading new member {} port", i))?;
.with_context(|| format!("reading new member {i} port"))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
@@ -1508,7 +1506,7 @@ mod tests {
let mut vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
r => panic!("unexpected response: {:?}", r),
r => panic!("unexpected response: {r:?}"),
}
// reboot...
@@ -1523,7 +1521,7 @@ mod tests {
vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
r => panic!("unexpected response: {:?}", r),
r => panic!("unexpected response: {r:?}"),
}
}

View File

@@ -342,7 +342,7 @@ where
let bytes_read1 = reader1
.read(&mut buffer1[..bytes_to_read])
.await
.with_context(|| format!("failed to read from reader1 at offset {}", offset))?;
.with_context(|| format!("failed to read from reader1 at offset {offset}"))?;
if bytes_read1 == 0 {
anyhow::bail!("unexpected EOF from reader1 at offset {}", offset);
}
@@ -351,10 +351,7 @@ where
.read_exact(&mut buffer2[..bytes_read1])
.await
.with_context(|| {
format!(
"failed to read {} bytes from reader2 at offset {}",
bytes_read1, offset
)
format!("failed to read {bytes_read1} bytes from reader2 at offset {offset}")
})?;
assert!(bytes_read2 == bytes_read1);

View File

@@ -108,7 +108,7 @@ impl std::fmt::Debug for ManagerCtlMessage {
match self {
ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
ManagerCtlMessage::TryGuardRequest(_) => write!(f, "TryGuardRequest"),
ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({id:?})"),
ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
}
}

View File

@@ -147,7 +147,7 @@ impl GlobalTimelines {
};
let mut tenant_count = 0;
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
.with_context(|| format!("failed to list tenants dir {tenants_dir}"))?
{
match &tenants_dir_entry {
Ok(tenants_dir_entry) => {
@@ -188,7 +188,7 @@ impl GlobalTimelines {
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
.with_context(|| format!("failed to list timelines dir {timelines_dir}"))?
{
match &timelines_dir_entry {
Ok(timeline_dir_entry) => {

View File

@@ -364,8 +364,7 @@ impl PartialBackup {
// there should always be zero or one uploaded segment
assert!(
new_segments.is_empty(),
"too many uploaded segments: {:?}",
new_segments
"too many uploaded segments: {new_segments:?}"
);
}

View File

@@ -841,7 +841,7 @@ pub(crate) async fn open_wal_file(
// If that failed, try it without the .partial extension.
let pf = tokio::fs::File::open(&wal_file_path)
.await
.with_context(|| format!("failed to open WAL file {:#}", wal_file_path))
.with_context(|| format!("failed to open WAL file {wal_file_path:#}"))
.map_err(|e| {
warn!("{}", e);
e

View File

@@ -33,7 +33,7 @@ impl FormatTime for SimClock {
if let Some(clock) = clock.as_ref() {
let now = clock.now();
write!(w, "[{}]", now)
write!(w, "[{now}]")
} else {
write!(w, "[?]")
}

View File

@@ -257,7 +257,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
let estr = e.to_string();
if !estr.contains("finished processing START_REPLICATION") {
warn!("conn {:?} error: {:?}", connection_id, e);
panic!("unexpected error at safekeeper: {:#}", e);
panic!("unexpected error at safekeeper: {e:#}");
}
conns.remove(&connection_id);
break;

View File

@@ -217,7 +217,7 @@ impl TestConfig {
];
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
let safekeepers_addrs = server_ids.map(|id| format!("node:{}", id)).to_vec();
let safekeepers_addrs = server_ids.map(|id| format!("node:{id}")).to_vec();
let ttid = TenantTimelineId::generate();

View File

@@ -523,7 +523,7 @@ impl ApiImpl for SimulationApi {
// Voting bug when safekeeper disconnects after voting
executor::exit(1, msg.to_owned());
}
panic!("unknown FATAL error from walproposer: {}", msg);
panic!("unknown FATAL error from walproposer: {msg}");
}
}
@@ -544,10 +544,7 @@ impl ApiImpl for SimulationApi {
}
}
let msg = format!(
"prop_elected;{};{};{};{}",
prop_lsn, prop_term, prev_lsn, prev_term
);
let msg = format!("prop_elected;{prop_lsn};{prop_term};{prev_lsn};{prev_term}");
debug!(msg);
self.os.log_event(msg);

Some files were not shown because too many files have changed in this diff Show More