From a22be5af72d4c44fcbb320371eaf0f430e008242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 27 Feb 2025 10:40:40 +0100 Subject: [PATCH] Migrate the last crates to edition 2024 (#10998) Migrates the remaining crates to edition 2024. We like to stay on the latest edition if possible. There is no functional changes, however some code changes had to be done to accommodate the edition's breaking changes. Like the previous migration PRs, this is comprised of three commits: * the first does the edition update and makes `cargo check`/`cargo clippy` pass. we had to update bindgen to make its output [satisfy the requirements of edition 2024](https://doc.rust-lang.org/edition-guide/rust-2024/unsafe-extern.html) * the second commit does a `cargo fmt` for the new style edition. * the third commit reorders imports as a one-off change. As before, it is entirely optional. Part of #10918 --- Cargo.lock | 16 +++-- Cargo.toml | 4 +- control_plane/src/background_process.rs | 2 +- control_plane/src/bin/neon_local.rs | 43 +++++++------ control_plane/src/broker.rs | 1 - control_plane/src/endpoint.rs | 30 ++++------ control_plane/src/local_env.rs | 30 ++++------ control_plane/src/pageserver.rs | 22 +++---- control_plane/src/postgresql_conf.rs | 5 +- control_plane/src/safekeeper.rs | 9 +-- control_plane/src/storage_controller.rs | 58 +++++++++--------- control_plane/storcon_cli/src/main.rs | 51 ++++++++-------- libs/consumption_metrics/Cargo.toml | 2 +- libs/desim/src/chan.rs | 3 +- libs/desim/src/executor.rs | 12 ++-- libs/desim/src/network.rs | 31 ++++------ libs/desim/src/node_os.rs | 9 +-- libs/desim/src/options.rs | 3 +- libs/desim/src/proto.rs | 3 +- libs/desim/src/time.rs | 14 ++--- libs/desim/src/world.rs | 27 ++++----- libs/desim/tests/reliable_copy_test.rs | 7 ++- libs/http-utils/src/endpoint.rs | 60 ++++++++++--------- libs/http-utils/src/error.rs | 6 +- libs/http-utils/src/failpoints.rs | 7 +-- libs/http-utils/src/json.rs | 2 +- libs/http-utils/src/lib.rs | 2 +- libs/http-utils/src/pprof.rs | 14 ++--- libs/http-utils/src/request.rs | 11 ++-- libs/metrics/src/hll.rs | 32 +++++----- libs/metrics/src/launch_timestamp.rs | 3 +- libs/metrics/src/lib.rs | 42 +++++-------- libs/postgres_backend/src/lib.rs | 28 +++++---- libs/postgres_backend/tests/simple_select.rs | 5 +- libs/postgres_connection/src/lib.rs | 11 ++-- libs/postgres_ffi/benches/waldecoder.rs | 2 +- libs/postgres_ffi/build.rs | 2 +- libs/postgres_ffi/src/lib.rs | 32 +++++----- libs/postgres_ffi/src/pg_constants.rs | 3 +- libs/postgres_ffi/src/walrecord.rs | 27 +++++---- .../wal_craft/src/bin/wal_craft.rs | 6 +- libs/postgres_ffi/wal_craft/src/lib.rs | 19 +++--- libs/pq_proto/src/framed.rs | 7 +-- libs/pq_proto/src/lib.rs | 13 ++-- libs/tenant_size_model/src/calculation.rs | 6 +- libs/tenant_size_model/src/svg.rs | 5 +- libs/tracing-utils/src/http.rs | 4 +- libs/tracing-utils/src/lib.rs | 4 +- libs/utils/benches/benchmarks.rs | 2 +- libs/utils/src/auth.rs | 24 +++++--- libs/utils/src/backoff.rs | 4 +- libs/utils/src/bin_ser.rs | 12 ++-- libs/utils/src/circuit_breaker.rs | 6 +- libs/utils/src/completion.rs | 3 +- libs/utils/src/crashsafe.rs | 8 +-- libs/utils/src/env.rs | 3 +- libs/utils/src/failpoint_support.rs | 8 ++- libs/utils/src/fs_ext.rs | 3 +- libs/utils/src/fs_ext/rename_noreplace.rs | 3 +- libs/utils/src/generation.rs | 4 +- libs/utils/src/guard_arc_swap.rs | 3 +- libs/utils/src/id.rs | 8 +-- libs/utils/src/leaky_bucket.rs | 13 ++-- libs/utils/src/linux_socket_ioctl.rs | 28 +++++---- libs/utils/src/lock_file.rs | 13 ++-- libs/utils/src/logging.rs | 7 ++- libs/utils/src/lsn.rs | 9 +-- libs/utils/src/measured_stream.rs | 3 +- libs/utils/src/pageserver_feedback.rs | 2 +- libs/utils/src/postgres_client.rs | 2 +- libs/utils/src/rate_limit.rs | 3 +- libs/utils/src/sentry_init.rs | 2 +- libs/utils/src/seqwait.rs | 10 ++-- libs/utils/src/serde_percent.rs | 6 +- libs/utils/src/shard.rs | 9 +-- libs/utils/src/signals.rs | 4 +- libs/utils/src/simple_rcu.rs | 6 +- libs/utils/src/sync/gate.rs | 10 +--- libs/utils/src/sync/heavier_once_cell.rs | 16 +++-- libs/utils/src/sync/spsc_fold.rs | 3 +- libs/utils/src/tcp_listener.rs | 9 ++- libs/utils/src/tracing_span_assert.rs | 10 ++-- libs/utils/src/try_rcu.rs | 6 +- libs/utils/src/vec_map.rs | 7 ++- libs/utils/src/zstd.rs | 17 ++---- libs/utils/tests/bin_ser_test.rs | 3 +- .../benches/bench_interpret_wal.rs | 28 +++++---- libs/wal_decoder/src/decoder.rs | 5 +- libs/wal_decoder/src/serialized_batch.rs | 17 +++--- libs/wal_decoder/src/wire_format.rs | 7 +-- libs/walproposer/build.rs | 6 +- libs/walproposer/src/api_bindings.rs | 27 +++------ libs/walproposer/src/walproposer.rs | 35 +++++------ pageserver/client/src/mgmt_api.rs | 16 +++-- pageserver/client/src/page_service.rs | 21 +++---- .../src/bin/compaction-simulator.rs | 11 ++-- pageserver/compaction/src/compact_tiered.rs | 22 +++---- pageserver/compaction/src/helpers.rs | 16 ++--- pageserver/compaction/src/identify_levels.rs | 15 +++-- pageserver/compaction/src/interface.rs | 7 ++- pageserver/compaction/src/simulator.rs | 15 ++--- pageserver/compaction/src/simulator/draw.rs | 16 ++--- pageserver/ctl/src/draw_timeline_dir.rs | 16 ++--- pageserver/ctl/src/key.rs | 26 +++++--- pageserver/ctl/src/layer_map_analyzer.rs | 22 +++---- pageserver/ctl/src/layers.rs | 5 +- pageserver/ctl/src/main.rs | 36 ++++++----- pageserver/pagebench/src/cmd/aux_files.rs | 8 +-- pageserver/pagebench/src/cmd/basebackup.rs | 24 ++++---- .../pagebench/src/cmd/getpage_latest_lsn.rs | 28 ++++----- .../src/cmd/ondemand_download_churn.rs | 22 +++---- .../cmd/trigger_initial_size_calculation.rs | 3 +- safekeeper/client/src/mgmt_api.rs | 9 ++- storage_controller/client/src/control_api.rs | 3 +- storage_controller/src/service.rs | 2 +- 115 files changed, 723 insertions(+), 769 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d11f2b7fc..293ed465ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -984,9 +984,9 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.70.1" +version = "0.71.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" +checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" dependencies = [ "bitflags 2.8.0", "cexpr", @@ -997,7 +997,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn 2.0.90", ] @@ -3537,7 +3537,7 @@ dependencies = [ "measured-derive", "memchr", "parking_lot 0.12.1", - "rustc-hash", + "rustc-hash 1.1.0", "ryu", ] @@ -5012,7 +5012,7 @@ dependencies = [ "reqwest-tracing", "rsa", "rstest", - "rustc-hash", + "rustc-hash 1.1.0", "rustls 0.23.18", "rustls-native-certs 0.8.0", "rustls-pemfile 2.1.1", @@ -5630,6 +5630,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 223ff4249e..ff45d46a47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ members = [ ] [workspace.package] -edition = "2021" +edition = "2024" license = "Apache-2.0" ## All dependency versions, used in the project @@ -70,7 +70,7 @@ aws-types = "1.3" axum = { version = "0.8.1", features = ["ws"] } base64 = "0.13.0" bincode = "1.3" -bindgen = "0.70" +bindgen = "0.71" bit_field = "0.10.2" bstr = "1.0" byteorder = "1.4" diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index c668e68402..1eac4f7ff0 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -25,7 +25,7 @@ use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use nix::errno::Errno; use nix::fcntl::{FcntlArg, FdFlag}; -use nix::sys::signal::{kill, Signal}; +use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; use utils::pid_file::{self, PidFileRead}; diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 7d908ccae9..f258025428 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -5,7 +5,16 @@ //! easier to work with locally. The python tests in `test_runner` //! rely on `neon_local` to set up the environment for each test. //! -use anyhow::{anyhow, bail, Context, Result}; +use std::borrow::Cow; +use std::collections::{BTreeSet, HashMap}; +use std::fs::File; +use std::os::fd::AsRawFd; +use std::path::PathBuf; +use std::process::exit; +use std::str::FromStr; +use std::time::Duration; + +use anyhow::{Context, Result, anyhow, bail}; use clap::Parser; use compute_api::spec::ComputeMode; use control_plane::endpoint::ComputeControlPlane; @@ -19,7 +28,7 @@ use control_plane::storage_controller::{ NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController, }; use control_plane::{broker, local_env}; -use nix::fcntl::{flock, FlockArg}; +use nix::fcntl::{FlockArg, flock}; use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, @@ -35,23 +44,13 @@ use safekeeper_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, }; -use std::borrow::Cow; -use std::collections::{BTreeSet, HashMap}; -use std::fs::File; -use std::os::fd::AsRawFd; -use std::path::PathBuf; -use std::process::exit; -use std::str::FromStr; -use std::time::Duration; use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR; use tokio::task::JoinSet; use url::Host; -use utils::{ - auth::{Claims, Scope}, - id::{NodeId, TenantId, TenantTimelineId, TimelineId}, - lsn::Lsn, - project_git_version, -}; +use utils::auth::{Claims, Scope}; +use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; +use utils::lsn::Lsn; +use utils::project_git_version; // Default id of a safekeeper node, if not specified on the command line. const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1); @@ -921,7 +920,9 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config { // User (likely the Python test suite) provided a description of the environment. if args.num_pageservers.is_some() { - bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"); + bail!( + "Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead" + ); } // load and parse the file let contents = std::fs::read_to_string(config_path).with_context(|| { @@ -1315,10 +1316,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res match (mode, args.hot_standby) { (ComputeMode::Static(_), true) => { - bail!("Cannot start a node in hot standby mode when it is already configured as a static replica") + bail!( + "Cannot start a node in hot standby mode when it is already configured as a static replica" + ) } (ComputeMode::Primary, true) => { - bail!("Cannot start a node as a hot standby replica, it is already configured as primary node") + bail!( + "Cannot start a node as a hot standby replica, it is already configured as primary node" + ) } _ => {} } diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index c8ac5d8981..1b507bb384 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -8,7 +8,6 @@ use std::time::Duration; use anyhow::Context; - use camino::Utf8PathBuf; use crate::{background_process, local_env}; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 407578abb8..50ccca36fe 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -37,27 +37,20 @@ //! ``` //! use std::collections::BTreeMap; -use std::net::IpAddr; -use std::net::Ipv4Addr; -use std::net::SocketAddr; -use std::net::TcpStream; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::path::PathBuf; use std::process::Command; use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; -use std::time::SystemTime; -use std::time::UNIX_EPOCH; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{Context, Result, anyhow, bail}; use compute_api::requests::ConfigurationRequest; -use compute_api::responses::ComputeCtlConfig; -use compute_api::spec::Database; -use compute_api::spec::PgIdent; -use compute_api::spec::RemoteExtSpec; -use compute_api::spec::Role; -use nix::sys::signal::kill; -use nix::sys::signal::Signal; +use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse}; +use compute_api::spec::{ + Cluster, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, RemoteExtSpec, Role, +}; +use nix::sys::signal::{Signal, kill}; use pageserver_api::shard::ShardStripeSize; use reqwest::header::CONTENT_TYPE; use serde::{Deserialize, Serialize}; @@ -69,9 +62,6 @@ use crate::local_env::LocalEnv; use crate::postgresql_conf::PostgresConf; use crate::storage_controller::StorageController; -use compute_api::responses::{ComputeStatus, ComputeStatusResponse}; -use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec}; - // contents of a endpoint.json file #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct EndpointConf { @@ -237,7 +227,9 @@ impl ComputeControlPlane { }); if let Some((key, _)) = duplicates.next() { - bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."); + bail!( + "attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported." + ); } } Ok(()) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 2fe4cd5202..f4026efbbf 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -3,28 +3,22 @@ //! Now it also provides init method which acts like a stub for proper installation //! script which will use local paths. -use anyhow::{bail, Context}; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::time::Duration; +use std::{env, fs}; +use anyhow::{Context, bail}; use clap::ValueEnum; use postgres_backend::AuthType; use reqwest::Url; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::env; -use std::fs; -use std::net::IpAddr; -use std::net::Ipv4Addr; -use std::net::SocketAddr; -use std::path::{Path, PathBuf}; -use std::process::{Command, Stdio}; -use std::time::Duration; -use utils::{ - auth::{encode_from_key_file, Claims}, - id::{NodeId, TenantId, TenantTimelineId, TimelineId}, -}; +use utils::auth::{Claims, encode_from_key_file}; +use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; -use crate::pageserver::PageServerNode; -use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR; +use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode}; use crate::safekeeper::SafekeeperNode; pub const DEFAULT_PG_VERSION: u32 = 16; @@ -465,7 +459,9 @@ impl LocalEnv { if old_timeline_id == &timeline_id { Ok(()) } else { - bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"); + bail!( + "branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}" + ); } } else { existing_values.push((tenant_id, timeline_id)); diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 2bf89b7bfa..39656bdbbe 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -7,7 +7,6 @@ //! ``` //! use std::collections::HashMap; - use std::io; use std::io::Write; use std::num::NonZeroU64; @@ -15,22 +14,19 @@ use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use camino::Utf8PathBuf; use pageserver_api::models::{self, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; -use postgres_connection::{parse_host_port, PgConnectionConfig}; +use postgres_connection::{PgConnectionConfig, parse_host_port}; use utils::auth::{Claims, Scope}; -use utils::id::NodeId; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; +use utils::id::{NodeId, TenantId, TimelineId}; +use utils::lsn::Lsn; -use crate::local_env::{NeonLocalInitPageserverConf, PageServerConf}; -use crate::{background_process, local_env::LocalEnv}; +use crate::background_process; +use crate::local_env::{LocalEnv, NeonLocalInitPageserverConf, PageServerConf}; /// Directory within .neon which will be used by default for LocalFs remote storage. pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver"; @@ -81,7 +77,11 @@ impl PageServerNode { &self, conf: NeonLocalInitPageserverConf, ) -> anyhow::Result { - assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully"); + assert_eq!( + &PageServerConf::from(&conf), + &self.conf, + "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully" + ); // TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656) diff --git a/control_plane/src/postgresql_conf.rs b/control_plane/src/postgresql_conf.rs index 5aee12dc97..a824af9490 100644 --- a/control_plane/src/postgresql_conf.rs +++ b/control_plane/src/postgresql_conf.rs @@ -1,3 +1,6 @@ +use std::collections::HashMap; +use std::fmt; + /// /// Module for parsing postgresql.conf file. /// @@ -6,8 +9,6 @@ /// funny stuff like include-directives or funny escaping. use once_cell::sync::Lazy; use regex::Regex; -use std::collections::HashMap; -use std::fmt; /// In-memory representation of a postgresql.conf file #[derive(Default, Debug)] diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index ce7751fb14..70915d5aaf 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -14,18 +14,15 @@ use std::{io, result}; use anyhow::Context; use camino::Utf8PathBuf; +use http_utils::error::HttpErrorBody; use postgres_connection::PgConnectionConfig; use reqwest::{IntoUrl, Method}; use thiserror::Error; - -use http_utils::error::HttpErrorBody; use utils::auth::{Claims, Scope}; use utils::id::NodeId; -use crate::{ - background_process, - local_env::{LocalEnv, SafekeeperConf}, -}; +use crate::background_process; +use crate::local_env::{LocalEnv, SafekeeperConf}; #[derive(Error, Debug)] pub enum SafekeeperHttpError { diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 0fadb9c5fe..16e12f4e02 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -1,44 +1,39 @@ -use crate::{ - background_process, - local_env::{LocalEnv, NeonStorageControllerConf}, -}; +use std::ffi::OsStr; +use std::fs; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::process::ExitStatus; +use std::str::FromStr; +use std::sync::OnceLock; +use std::time::{Duration, Instant}; + use camino::{Utf8Path, Utf8PathBuf}; use hyper0::Uri; use nix::unistd::Pid; -use pageserver_api::{ - controller_api::{ - NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, - TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest, - TenantShardMigrateResponse, - }, - models::{ - TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, - }, - shard::{ShardStripeSize, TenantShardId}, +use pageserver_api::controller_api::{ + NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, + TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest, + TenantShardMigrateResponse, }; +use pageserver_api::models::{ + TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, +}; +use pageserver_api::shard::{ShardStripeSize, TenantShardId}; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use reqwest::Method; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{ - ffi::OsStr, - fs, - net::SocketAddr, - path::PathBuf, - process::ExitStatus, - str::FromStr, - sync::OnceLock, - time::{Duration, Instant}, -}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use tokio::process::Command; use tracing::instrument; use url::Url; -use utils::{ - auth::{encode_from_key_file, Claims, Scope}, - id::{NodeId, TenantId}, -}; +use utils::auth::{Claims, Scope, encode_from_key_file}; +use utils::id::{NodeId, TenantId}; use whoami::username; +use crate::background_process; +use crate::local_env::{LocalEnv, NeonStorageControllerConf}; + pub struct StorageController { env: LocalEnv, private_key: Option>, @@ -96,7 +91,8 @@ pub struct AttachHookRequest { #[derive(Serialize, Deserialize)] pub struct AttachHookResponse { - pub gen: Option, + #[serde(rename = "gen")] + pub generation: Option, } #[derive(Serialize, Deserialize)] @@ -779,7 +775,7 @@ impl StorageController { ) .await?; - Ok(response.gen) + Ok(response.generation) } #[instrument(skip(self))] diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 40b86e4110..2e2c22c791 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -1,34 +1,27 @@ -use futures::StreamExt; -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, - time::Duration, -}; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; +use std::time::Duration; use clap::{Parser, Subcommand}; -use pageserver_api::{ - controller_api::{ - AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, - SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy, - ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, - TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, - }, - models::{ - EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, - ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, - TenantShardSplitRequest, TenantShardSplitResponse, - }, - shard::{ShardStripeSize, TenantShardId}, +use futures::StreamExt; +use pageserver_api::controller_api::{ + AvailabilityZone, NodeAvailabilityWrapper, NodeConfigureRequest, NodeDescribeResponse, + NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, PlacementPolicy, + SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy, + ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, + TenantDescribeResponse, TenantPolicyRequest, TenantShardMigrateRequest, + TenantShardMigrateResponse, }; +use pageserver_api::models::{ + EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters, + TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest, + TenantShardSplitResponse, +}; +use pageserver_api::shard::{ShardStripeSize, TenantShardId}; use pageserver_client::mgmt_api::{self}; use reqwest::{Method, StatusCode, Url}; -use utils::id::{NodeId, TenantId, TimelineId}; - -use pageserver_api::controller_api::{ - NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy, - TenantShardMigrateRequest, TenantShardMigrateResponse, -}; use storage_controller_client::control_api::Client; +use utils::id::{NodeId, TenantId, TimelineId}; #[derive(Subcommand, Debug)] enum Command { @@ -921,7 +914,9 @@ async fn main() -> anyhow::Result<()> { } Command::TenantDrop { tenant_id, unclean } => { if !unclean { - anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.") + anyhow::bail!( + "This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed." + ) } storcon_client .dispatch::<(), ()>( @@ -933,7 +928,9 @@ async fn main() -> anyhow::Result<()> { } Command::NodeDrop { node_id, unclean } => { if !unclean { - anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.") + anyhow::bail!( + "This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed." + ) } storcon_client .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None) diff --git a/libs/consumption_metrics/Cargo.toml b/libs/consumption_metrics/Cargo.toml index 0e517e3856..77f130950e 100644 --- a/libs/consumption_metrics/Cargo.toml +++ b/libs/consumption_metrics/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "consumption_metrics" version = "0.1.0" -edition = "2021" +edition = "2024" license = "Apache-2.0" [dependencies] diff --git a/libs/desim/src/chan.rs b/libs/desim/src/chan.rs index 6661d59871..8882cd3b56 100644 --- a/libs/desim/src/chan.rs +++ b/libs/desim/src/chan.rs @@ -1,4 +1,5 @@ -use std::{collections::VecDeque, sync::Arc}; +use std::collections::VecDeque; +use std::sync::Arc; use parking_lot::{Mutex, MutexGuard}; diff --git a/libs/desim/src/executor.rs b/libs/desim/src/executor.rs index 9d44bd7741..df8b071c06 100644 --- a/libs/desim/src/executor.rs +++ b/libs/desim/src/executor.rs @@ -1,11 +1,7 @@ -use std::{ - panic::AssertUnwindSafe, - sync::{ - atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, - mpsc, Arc, OnceLock, - }, - thread::JoinHandle, -}; +use std::panic::AssertUnwindSafe; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering}; +use std::sync::{Arc, OnceLock, mpsc}; +use std::thread::JoinHandle; use tracing::{debug, error, trace}; diff --git a/libs/desim/src/network.rs b/libs/desim/src/network.rs index e15a714daa..cf096dba80 100644 --- a/libs/desim/src/network.rs +++ b/libs/desim/src/network.rs @@ -1,26 +1,19 @@ -use std::{ - cmp::Ordering, - collections::{BinaryHeap, VecDeque}, - fmt::{self, Debug}, - ops::DerefMut, - sync::{mpsc, Arc}, -}; +use std::cmp::Ordering; +use std::collections::{BinaryHeap, VecDeque}; +use std::fmt::{self, Debug}; +use std::ops::DerefMut; +use std::sync::{Arc, mpsc}; -use parking_lot::{ - lock_api::{MappedMutexGuard, MutexGuard}, - Mutex, RawMutex, -}; +use parking_lot::lock_api::{MappedMutexGuard, MutexGuard}; +use parking_lot::{Mutex, RawMutex}; use rand::rngs::StdRng; use tracing::debug; -use crate::{ - executor::{self, ThreadContext}, - options::NetworkOptions, - proto::NetEvent, - proto::NodeEvent, -}; - -use super::{chan::Chan, proto::AnyMessage}; +use super::chan::Chan; +use super::proto::AnyMessage; +use crate::executor::{self, ThreadContext}; +use crate::options::NetworkOptions; +use crate::proto::{NetEvent, NodeEvent}; pub struct NetworkTask { options: Arc, diff --git a/libs/desim/src/node_os.rs b/libs/desim/src/node_os.rs index 7744a9f5e1..e0cde7b284 100644 --- a/libs/desim/src/node_os.rs +++ b/libs/desim/src/node_os.rs @@ -2,14 +2,11 @@ use std::sync::Arc; use rand::Rng; +use super::chan::Chan; +use super::network::TCP; +use super::world::{Node, NodeId, World}; use crate::proto::NodeEvent; -use super::{ - chan::Chan, - network::TCP, - world::{Node, NodeId, World}, -}; - /// Abstraction with all functions (aka syscalls) available to the node. #[derive(Clone)] pub struct NodeOs { diff --git a/libs/desim/src/options.rs b/libs/desim/src/options.rs index 5da7c2c482..9b1a42fd28 100644 --- a/libs/desim/src/options.rs +++ b/libs/desim/src/options.rs @@ -1,4 +1,5 @@ -use rand::{rngs::StdRng, Rng}; +use rand::Rng; +use rand::rngs::StdRng; /// Describes random delays and failures. Delay will be uniformly distributed in [min, max]. /// Connection failure will occur with the probablity fail_prob. diff --git a/libs/desim/src/proto.rs b/libs/desim/src/proto.rs index 92a7e8a27d..31bc29e6a6 100644 --- a/libs/desim/src/proto.rs +++ b/libs/desim/src/proto.rs @@ -3,7 +3,8 @@ use std::fmt::Debug; use bytes::Bytes; use utils::lsn::Lsn; -use crate::{network::TCP, world::NodeId}; +use crate::network::TCP; +use crate::world::NodeId; /// Internal node events. #[derive(Debug)] diff --git a/libs/desim/src/time.rs b/libs/desim/src/time.rs index 7ce605bda8..350d182cc3 100644 --- a/libs/desim/src/time.rs +++ b/libs/desim/src/time.rs @@ -1,12 +1,8 @@ -use std::{ - cmp::Ordering, - collections::BinaryHeap, - ops::DerefMut, - sync::{ - atomic::{AtomicU32, AtomicU64}, - Arc, - }, -}; +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::ops::DerefMut; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, AtomicU64}; use parking_lot::Mutex; use tracing::trace; diff --git a/libs/desim/src/world.rs b/libs/desim/src/world.rs index 7d60be04b5..576ba89cd7 100644 --- a/libs/desim/src/world.rs +++ b/libs/desim/src/world.rs @@ -1,19 +1,18 @@ +use std::ops::DerefMut; +use std::sync::{Arc, mpsc}; + use parking_lot::Mutex; -use rand::{rngs::StdRng, SeedableRng}; -use std::{ - ops::DerefMut, - sync::{mpsc, Arc}, -}; +use rand::SeedableRng; +use rand::rngs::StdRng; -use crate::{ - executor::{ExternalHandle, Runtime}, - network::NetworkTask, - options::NetworkOptions, - proto::{NodeEvent, SimEvent}, - time::Timing, -}; - -use super::{chan::Chan, network::TCP, node_os::NodeOs}; +use super::chan::Chan; +use super::network::TCP; +use super::node_os::NodeOs; +use crate::executor::{ExternalHandle, Runtime}; +use crate::network::NetworkTask; +use crate::options::NetworkOptions; +use crate::proto::{NodeEvent, SimEvent}; +use crate::time::Timing; pub type NodeId = u32; diff --git a/libs/desim/tests/reliable_copy_test.rs b/libs/desim/tests/reliable_copy_test.rs index cf7bff8f5a..1ddf9844de 100644 --- a/libs/desim/tests/reliable_copy_test.rs +++ b/libs/desim/tests/reliable_copy_test.rs @@ -1,14 +1,15 @@ //! Simple test to verify that simulator is working. #[cfg(test)] mod reliable_copy_test { + use std::sync::Arc; + use anyhow::Result; use desim::executor::{self, PollSome}; + use desim::node_os::NodeOs; use desim::options::{Delay, NetworkOptions}; - use desim::proto::{NetEvent, NodeEvent, ReplCell}; + use desim::proto::{AnyMessage, NetEvent, NodeEvent, ReplCell}; use desim::world::{NodeId, World}; - use desim::{node_os::NodeOs, proto::AnyMessage}; use parking_lot::Mutex; - use std::sync::Arc; use tracing::info; /// Disk storage trait and implementation. diff --git a/libs/http-utils/src/endpoint.rs b/libs/http-utils/src/endpoint.rs index be97b341d1..6128113580 100644 --- a/libs/http-utils/src/endpoint.rs +++ b/libs/http-utils/src/endpoint.rs @@ -1,30 +1,30 @@ -use crate::error::{api_error_handler, route_error_handler, ApiError}; -use crate::pprof; -use crate::request::{get_query_param, parse_query_param}; -use ::pprof::protos::Message as _; -use ::pprof::ProfilerGuardBuilder; -use anyhow::{anyhow, Context}; -use bytes::{Bytes, BytesMut}; -use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION}; -use hyper::http::HeaderValue; -use hyper::Method; -use hyper::{header::CONTENT_TYPE, Body, Request, Response}; -use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; -use once_cell::sync::Lazy; -use regex::Regex; -use routerify::ext::RequestExt; -use routerify::{Middleware, RequestInfo, Router, RouterBuilder}; -use tokio::sync::{mpsc, Mutex, Notify}; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::ReaderStream; -use tracing::{debug, info, info_span, warn, Instrument}; -use utils::auth::{AuthError, Claims, SwappableJwtAuth}; - use std::future::Future; use std::io::Write as _; use std::str::FromStr; use std::time::Duration; +use ::pprof::ProfilerGuardBuilder; +use ::pprof::protos::Message as _; +use anyhow::{Context, anyhow}; +use bytes::{Bytes, BytesMut}; +use hyper::header::{AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_TYPE, HeaderName}; +use hyper::http::HeaderValue; +use hyper::{Body, Method, Request, Response}; +use metrics::{Encoder, IntCounter, TextEncoder, register_int_counter}; +use once_cell::sync::Lazy; +use regex::Regex; +use routerify::ext::RequestExt; +use routerify::{Middleware, RequestInfo, Router, RouterBuilder}; +use tokio::sync::{Mutex, Notify, mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::io::ReaderStream; +use tracing::{Instrument, debug, info, info_span, warn}; +use utils::auth::{AuthError, Claims, SwappableJwtAuth}; + +use crate::error::{ApiError, api_error_handler, route_error_handler}; +use crate::pprof; +use crate::request::{get_query_param, parse_query_param}; + static SERVE_METRICS_COUNT: Lazy = Lazy::new(|| { register_int_counter!( "libmetrics_metric_handler_requests_total", @@ -375,7 +375,7 @@ pub async fn profile_cpu_handler(req: Request) -> Result, A Err(_) => { return Err(ApiError::Conflict( "profiler already running (use ?force=true to cancel it)".into(), - )) + )); } } tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait @@ -539,8 +539,8 @@ pub async fn profile_heap_handler(req: Request) -> Result, } } -pub fn add_request_id_middleware( -) -> Middleware { +pub fn add_request_id_middleware() +-> Middleware { Middleware::pre(move |req| async move { let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) { Some(request_id) => request_id @@ -664,7 +664,7 @@ pub fn auth_middleware( None => { return Err(ApiError::Unauthorized( "missing authorization header".to_string(), - )) + )); } } } @@ -717,12 +717,14 @@ pub fn check_permission_with( #[cfg(test)] mod tests { - use super::*; - use hyper::service::Service; - use routerify::RequestServiceBuilder; use std::future::poll_fn; use std::net::{IpAddr, SocketAddr}; + use hyper::service::Service; + use routerify::RequestServiceBuilder; + + use super::*; + #[tokio::test] async fn test_request_id_returned() { let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap(); diff --git a/libs/http-utils/src/error.rs b/libs/http-utils/src/error.rs index 746305caec..f790dc26ca 100644 --- a/libs/http-utils/src/error.rs +++ b/libs/http-utils/src/error.rs @@ -1,10 +1,10 @@ -use hyper::{header, Body, Response, StatusCode}; -use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::error::Error as StdError; + +use hyper::{Body, Response, StatusCode, header}; +use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{error, info, warn}; - use utils::auth::AuthError; #[derive(Debug, Error)] diff --git a/libs/http-utils/src/failpoints.rs b/libs/http-utils/src/failpoints.rs index 8a1e0c8cf0..984823f4a9 100644 --- a/libs/http-utils/src/failpoints.rs +++ b/libs/http-utils/src/failpoints.rs @@ -1,12 +1,11 @@ -use crate::error::ApiError; -use crate::json::{json_request, json_response}; - use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; - use utils::failpoint_support::apply_failpoint; +use crate::error::ApiError; +use crate::json::{json_request, json_response}; + pub type ConfigureFailpointsRequest = Vec; /// Information for configuring a single fail point diff --git a/libs/http-utils/src/json.rs b/libs/http-utils/src/json.rs index e53231f313..14ebac91e6 100644 --- a/libs/http-utils/src/json.rs +++ b/libs/http-utils/src/json.rs @@ -1,6 +1,6 @@ use anyhow::Context; use bytes::Buf; -use hyper::{header, Body, Request, Response, StatusCode}; +use hyper::{Body, Request, Response, StatusCode, header}; use serde::{Deserialize, Serialize}; use super::error::ApiError; diff --git a/libs/http-utils/src/lib.rs b/libs/http-utils/src/lib.rs index ae6a27aaa8..c692a54257 100644 --- a/libs/http-utils/src/lib.rs +++ b/libs/http-utils/src/lib.rs @@ -9,4 +9,4 @@ extern crate hyper0 as hyper; /// Current fast way to apply simple http routing in various Neon binaries. /// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed. -pub use routerify::{ext::RequestExt, RouterBuilder, RouterService}; +pub use routerify::{RouterBuilder, RouterService, ext::RequestExt}; diff --git a/libs/http-utils/src/pprof.rs b/libs/http-utils/src/pprof.rs index fe1cc10838..529017f350 100644 --- a/libs/http-utils/src/pprof.rs +++ b/libs/http-utils/src/pprof.rs @@ -1,15 +1,15 @@ -use anyhow::bail; -use flate2::write::{GzDecoder, GzEncoder}; -use flate2::Compression; -use itertools::Itertools as _; -use pprof::protos::{Function, Line, Location, Message as _, Profile}; -use regex::Regex; - use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::ffi::c_void; use std::io::Write as _; +use anyhow::bail; +use flate2::Compression; +use flate2::write::{GzDecoder, GzEncoder}; +use itertools::Itertools as _; +use pprof::protos::{Function, Line, Location, Message as _, Profile}; +use regex::Regex; + /// Decodes a gzip-compressed Protobuf-encoded pprof profile. pub fn decode(bytes: &[u8]) -> anyhow::Result { let mut gz = GzDecoder::new(Vec::new()); diff --git a/libs/http-utils/src/request.rs b/libs/http-utils/src/request.rs index 7ea71685ec..9024a90a82 100644 --- a/libs/http-utils/src/request.rs +++ b/libs/http-utils/src/request.rs @@ -1,10 +1,13 @@ use core::fmt; -use std::{borrow::Cow, str::FromStr}; +use std::borrow::Cow; +use std::str::FromStr; + +use anyhow::anyhow; +use hyper::body::HttpBody; +use hyper::{Body, Request}; +use routerify::ext::RequestExt; use super::error::ApiError; -use anyhow::anyhow; -use hyper::{body::HttpBody, Body, Request}; -use routerify::ext::RequestExt; pub fn get_request_param<'a>( request: &'a Request, diff --git a/libs/metrics/src/hll.rs b/libs/metrics/src/hll.rs index 723916a742..93f6a2b7cc 100644 --- a/libs/metrics/src/hll.rs +++ b/libs/metrics/src/hll.rs @@ -6,17 +6,15 @@ //! Probabilistic cardinality estimators, such as the HyperLogLog algorithm, //! use significantly less memory than this, but can only approximate the cardinality. -use std::{ - hash::{BuildHasher, BuildHasherDefault, Hash}, - sync::atomic::AtomicU8, -}; +use std::hash::{BuildHasher, BuildHasherDefault, Hash}; +use std::sync::atomic::AtomicU8; -use measured::{ - label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor}, - metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec}, - text::TextEncoder, - LabelGroup, -}; +use measured::LabelGroup; +use measured::label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor}; +use measured::metric::counter::CounterState; +use measured::metric::name::MetricNameEncoder; +use measured::metric::{Metric, MetricType, MetricVec}; +use measured::text::TextEncoder; use twox_hash::xxh3; /// Create an [`HyperLogLogVec`] and registers to default registry. @@ -27,9 +25,7 @@ macro_rules! register_hll_vec { $crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec) }}; - ($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{ - $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES) - }}; + ($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{ $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES) }}; } /// Create an [`HyperLogLog`] and registers to default registry. @@ -40,9 +36,7 @@ macro_rules! register_hll { $crate::register(Box::new(hll.clone())).map(|_| hll) }}; - ($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{ - $crate::register_hll!($N, $crate::opts!($NAME, $HELP)) - }}; + ($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{ $crate::register_hll!($N, $crate::opts!($NAME, $HELP)) }}; } /// HLL is a probabilistic cardinality measure. @@ -195,8 +189,10 @@ impl measured::metric::MetricEncoding); diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 0f6c2a0937..4df8d7bc51 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -4,38 +4,26 @@ //! a default registry. #![deny(clippy::undocumented_unsafe_blocks)] -use measured::{ - label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels}, - metric::{ - counter::CounterState, - gauge::GaugeState, - group::Encoding, - name::{MetricName, MetricNameEncoder}, - MetricEncoding, MetricFamilyEncoding, - }, - FixedCardinalityLabel, LabelGroup, MetricGroup, -}; +use measured::label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels}; +use measured::metric::counter::CounterState; +use measured::metric::gauge::GaugeState; +use measured::metric::group::Encoding; +use measured::metric::name::{MetricName, MetricNameEncoder}; +use measured::metric::{MetricEncoding, MetricFamilyEncoding}; +use measured::{FixedCardinalityLabel, LabelGroup, MetricGroup}; use once_cell::sync::Lazy; +use prometheus::Registry; use prometheus::core::{ Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, }; pub use prometheus::local::LocalHistogram; -pub use prometheus::opts; -pub use prometheus::register; -pub use prometheus::Error; -use prometheus::Registry; -pub use prometheus::{core, default_registry, proto}; -pub use prometheus::{exponential_buckets, linear_buckets}; -pub use prometheus::{register_counter_vec, Counter, CounterVec}; -pub use prometheus::{register_gauge, Gauge}; -pub use prometheus::{register_gauge_vec, GaugeVec}; -pub use prometheus::{register_histogram, Histogram}; -pub use prometheus::{register_histogram_vec, HistogramVec}; -pub use prometheus::{register_int_counter, IntCounter}; -pub use prometheus::{register_int_counter_vec, IntCounterVec}; -pub use prometheus::{register_int_gauge, IntGauge}; -pub use prometheus::{register_int_gauge_vec, IntGaugeVec}; -pub use prometheus::{Encoder, TextEncoder}; +pub use prometheus::{ + Counter, CounterVec, Encoder, Error, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, + IntCounterVec, IntGauge, IntGaugeVec, TextEncoder, core, default_registry, exponential_buckets, + linear_buckets, opts, proto, register, register_counter_vec, register_gauge, + register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter, + register_int_counter_vec, register_int_gauge, register_int_gauge_vec, +}; pub mod launch_timestamp; mod wrappers; diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index f74b229ac4..a0a891f0dc 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -4,28 +4,28 @@ //! is rather narrow, but we can extend it once required. #![deny(unsafe_code)] #![deny(clippy::undocumented_unsafe_blocks)] -use anyhow::Context; -use bytes::Bytes; -use serde::{Deserialize, Serialize}; +use std::future::Future; use std::io::ErrorKind; use std::net::SocketAddr; -use std::os::fd::AsRawFd; -use std::os::fd::RawFd; +use std::os::fd::{AsRawFd, RawFd}; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; -use std::task::{ready, Poll}; +use std::task::{Poll, ready}; use std::{fmt, io}; -use std::{future::Future, str::FromStr}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_rustls::TlsAcceptor; -use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, trace, warn}; +use anyhow::Context; +use bytes::Bytes; use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter}; use pq_proto::{ BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_ADMIN_SHUTDOWN, SQLSTATE_INTERNAL_ERROR, SQLSTATE_SUCCESSFUL_COMPLETION, }; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_rustls::TlsAcceptor; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, trace, warn}; /// An error, occurred during query processing: /// either during the connection ([`ConnectionError`]) or before/after it. @@ -746,7 +746,7 @@ impl PostgresBackend { match e { QueryError::Shutdown => return Ok(ProcessMsgResult::Break), QueryError::SimulatedConnectionError => { - return Err(QueryError::SimulatedConnectionError) + return Err(QueryError::SimulatedConnectionError); } err @ QueryError::Reconnect => { // Instruct the client to reconnect, stop processing messages @@ -1020,7 +1020,9 @@ fn log_query_error(query: &str, e: &QueryError) { } } QueryError::Disconnected(other_connection_error) => { - error!("query handler for '{query}' failed with connection error: {other_connection_error:?}") + error!( + "query handler for '{query}' failed with connection error: {other_connection_error:?}" + ) } QueryError::SimulatedConnectionError => { error!("query handler for query '{query}' failed due to a simulated connection error") diff --git a/libs/postgres_backend/tests/simple_select.rs b/libs/postgres_backend/tests/simple_select.rs index 3fcfbf4a03..907ef9eed3 100644 --- a/libs/postgres_backend/tests/simple_select.rs +++ b/libs/postgres_backend/tests/simple_select.rs @@ -1,10 +1,11 @@ +use std::io::Cursor; +use std::sync::Arc; + /// Test postgres_backend_async with tokio_postgres use once_cell::sync::Lazy; use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError}; use pq_proto::{BeMessage, RowDescriptor}; use rustls::crypto::ring; -use std::io::Cursor; -use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio_postgres::config::SslMode; diff --git a/libs/postgres_connection/src/lib.rs b/libs/postgres_connection/src/lib.rs index e3d31c6cfc..cd981b3729 100644 --- a/libs/postgres_connection/src/lib.rs +++ b/libs/postgres_connection/src/lib.rs @@ -1,9 +1,10 @@ #![deny(unsafe_code)] #![deny(clippy::undocumented_unsafe_blocks)] -use anyhow::{bail, Context}; -use itertools::Itertools; use std::borrow::Cow; use std::fmt; + +use anyhow::{Context, bail}; +use itertools::Itertools; use url::Host; /// Parses a string of format either `host:port` or `host` into a corresponding pair. @@ -29,9 +30,10 @@ pub fn parse_host_port>(host_port: S) -> Result<(Host, Option #[cfg(test)] mod tests_parse_host_port { - use crate::parse_host_port; use url::Host; + use crate::parse_host_port; + #[test] fn test_normal() { let (host, port) = parse_host_port("hello:123").unwrap(); @@ -207,10 +209,11 @@ impl fmt::Debug for PgConnectionConfig { #[cfg(test)] mod tests_pg_connection_config { - use crate::PgConnectionConfig; use once_cell::sync::Lazy; use url::Host; + use crate::PgConnectionConfig; + static STUB_HOST: Lazy = Lazy::new(|| Host::Domain("stub.host.example".to_owned())); #[test] diff --git a/libs/postgres_ffi/benches/waldecoder.rs b/libs/postgres_ffi/benches/waldecoder.rs index c8cf0d322a..2e1d62e452 100644 --- a/libs/postgres_ffi/benches/waldecoder.rs +++ b/libs/postgres_ffi/benches/waldecoder.rs @@ -1,6 +1,6 @@ use std::ffi::CStr; -use criterion::{criterion_group, criterion_main, Bencher, Criterion}; +use criterion::{Bencher, Criterion, criterion_group, criterion_main}; use postgres_ffi::v17::wal_generator::LogicalMessageGenerator; use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler; use postgres_ffi::waldecoder::WalStreamDecoder; diff --git a/libs/postgres_ffi/build.rs b/libs/postgres_ffi/build.rs index d3a85f2683..cdebd43f6f 100644 --- a/libs/postgres_ffi/build.rs +++ b/libs/postgres_ffi/build.rs @@ -4,7 +4,7 @@ use std::env; use std::path::PathBuf; use std::process::Command; -use anyhow::{anyhow, Context}; +use anyhow::{Context, anyhow}; use bindgen::callbacks::{DeriveInfo, ParseCallbacks}; #[derive(Debug)] diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 301bc2f16e..8dfd8d8750 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -21,7 +21,9 @@ macro_rules! postgres_ffi { pub mod bindings { // bindgen generates bindings for a lot of stuff we don't need #![allow(dead_code)] + #![allow(unsafe_op_in_unsafe_fn)] #![allow(clippy::undocumented_unsafe_blocks)] + #![allow(clippy::ptr_offset_with_cast)] use serde::{Deserialize, Serialize}; include!(concat!( @@ -43,8 +45,7 @@ macro_rules! postgres_ffi { pub const PG_MAJORVERSION: &str = stringify!($version); // Re-export some symbols from bindings - pub use bindings::DBState_DB_SHUTDOWNED; - pub use bindings::{CheckPoint, ControlFileData, XLogRecord}; + pub use bindings::{CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, XLogRecord}; pub const ZERO_CHECKPOINT: bytes::Bytes = bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]); @@ -221,21 +222,17 @@ pub mod relfile_utils; pub mod walrecord; // Export some widely used datatypes that are unlikely to change across Postgres versions -pub use v14::bindings::RepOriginId; -pub use v14::bindings::{uint32, uint64, Oid}; -pub use v14::bindings::{BlockNumber, OffsetNumber}; -pub use v14::bindings::{MultiXactId, TransactionId}; -pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo}; - +pub use v14::bindings::{ + BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData, + RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32, + uint64, +}; // Likewise for these, although the assumption that these don't change is a little more iffy. pub use v14::bindings::{MultiXactOffset, MultiXactStatus}; -pub use v14::bindings::{PageHeaderData, XLogRecord}; pub use v14::xlog_utils::{ XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, }; -pub use v14::bindings::{CheckPoint, ControlFileData}; - // from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and // --with-segsize=SEGSIZE, but assume the defaults for now. pub const BLCKSZ: u16 = 8192; @@ -246,13 +243,11 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; // Export some version independent functions that are used outside of this mod -pub use v14::xlog_utils::encode_logical_message; -pub use v14::xlog_utils::get_current_timestamp; -pub use v14::xlog_utils::to_pg_timestamp; -pub use v14::xlog_utils::try_from_pg_timestamp; -pub use v14::xlog_utils::XLogFileName; - pub use v14::bindings::DBState_DB_SHUTDOWNED; +pub use v14::xlog_utils::{ + XLogFileName, encode_logical_message, get_current_timestamp, to_pg_timestamp, + try_from_pg_timestamp, +}; pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool { dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info)) @@ -355,8 +350,9 @@ pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber { } pub mod waldecoder { - use bytes::{Buf, Bytes, BytesMut}; use std::num::NonZeroU32; + + use bytes::{Buf, Bytes, BytesMut}; use thiserror::Error; use utils::lsn::Lsn; diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index e343473d77..b0bdd8a8da 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -9,8 +9,7 @@ //! comments on them. //! -use crate::PageHeaderData; -use crate::BLCKSZ; +use crate::{BLCKSZ, PageHeaderData}; // // From pg_tablespace_d.h diff --git a/libs/postgres_ffi/src/walrecord.rs b/libs/postgres_ffi/src/walrecord.rs index fce37e2fdd..1ccf4590a9 100644 --- a/libs/postgres_ffi/src/walrecord.rs +++ b/libs/postgres_ffi/src/walrecord.rs @@ -3,18 +3,16 @@ //! //! TODO: Generate separate types for each supported PG version -use crate::pg_constants; -use crate::XLogRecord; -use crate::{ - BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, TimestampTz, - TransactionId, -}; -use crate::{BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD}; use bytes::{Buf, Bytes}; use serde::{Deserialize, Serialize}; use utils::bin_ser::DeserializeError; use utils::lsn::Lsn; +use crate::{ + BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, + TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants, +}; + #[repr(C)] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct XlMultiXactCreate { @@ -508,9 +506,10 @@ pub fn decode_wal_record( } pub mod v14 { - use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; + use crate::{OffsetNumber, TransactionId}; + #[repr(C)] #[derive(Debug)] pub struct XlHeapInsert { @@ -678,9 +677,10 @@ pub mod v15 { } pub mod v16 { + use bytes::{Buf, Bytes}; + pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange}; use crate::{OffsetNumber, TransactionId}; - use bytes::{Buf, Bytes}; pub struct XlHeapDelete { pub xmax: TransactionId, @@ -746,9 +746,10 @@ pub mod v16 { /* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */ pub mod rm_neon { - use crate::{OffsetNumber, TransactionId}; use bytes::{Buf, Bytes}; + use crate::{OffsetNumber, TransactionId}; + #[repr(C)] #[derive(Debug)] pub struct XlNeonHeapInsert { @@ -858,14 +859,14 @@ pub mod v16 { } pub mod v17 { - pub use super::v14::XlHeapLockUpdated; - pub use crate::{TimeLineID, TimestampTz}; use bytes::{Buf, Bytes}; - pub use super::v16::rm_neon; + pub use super::v14::XlHeapLockUpdated; pub use super::v16::{ XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange, + rm_neon, }; + pub use crate::{TimeLineID, TimestampTz}; #[repr(C)] #[derive(Debug)] diff --git a/libs/postgres_ffi/wal_craft/src/bin/wal_craft.rs b/libs/postgres_ffi/wal_craft/src/bin/wal_craft.rs index 41afcea6c2..6151ce34ac 100644 --- a/libs/postgres_ffi/wal_craft/src/bin/wal_craft.rs +++ b/libs/postgres_ffi/wal_craft/src/bin/wal_craft.rs @@ -1,7 +1,9 @@ +use std::path::PathBuf; +use std::str::FromStr; + use anyhow::*; -use clap::{value_parser, Arg, ArgMatches, Command}; +use clap::{Arg, ArgMatches, Command, value_parser}; use postgres::Client; -use std::{path::PathBuf, str::FromStr}; use wal_craft::*; fn main() -> Result<()> { diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index 77dff4ac99..ca9530faef 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -1,17 +1,18 @@ -use anyhow::{bail, ensure}; -use camino_tempfile::{tempdir, Utf8TempDir}; -use log::*; -use postgres::types::PgLsn; -use postgres::Client; -use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; -use postgres_ffi::{ - XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, -}; use std::ffi::OsStr; use std::path::{Path, PathBuf}; use std::process::Command; use std::time::{Duration, Instant}; +use anyhow::{bail, ensure}; +use camino_tempfile::{Utf8TempDir, tempdir}; +use log::*; +use postgres::Client; +use postgres::types::PgLsn; +use postgres_ffi::{ + WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, + XLOG_SIZE_OF_XLOG_SHORT_PHD, +}; + macro_rules! xlog_utils_test { ($version:ident) => { #[path = "."] diff --git a/libs/pq_proto/src/framed.rs b/libs/pq_proto/src/framed.rs index ccbb90e384..8e216d0f44 100644 --- a/libs/pq_proto/src/framed.rs +++ b/libs/pq_proto/src/framed.rs @@ -10,11 +10,10 @@ //! calls. //! //! [Box]: https://docs.rs/futures-util/0.3.26/src/futures_util/lock/bilock.rs.html#107 +use std::future::Future; +use std::io::{self, ErrorKind}; + use bytes::{Buf, BytesMut}; -use std::{ - future::Future, - io::{self, ErrorKind}, -}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}; use crate::{BeMessage, FeMessage, FeStartupPacket, ProtocolError}; diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index f99128b76a..e435ffbf7e 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -5,14 +5,15 @@ pub mod framed; +use std::borrow::Cow; +use std::{fmt, io, str}; + use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use itertools::Itertools; -use serde::{Deserialize, Serialize}; -use std::{borrow::Cow, fmt, io, str}; - // re-export for use in utils pageserver_feedback.rs pub use postgres_protocol::PG_EPOCH; +use serde::{Deserialize, Serialize}; pub type Oid = u32; pub type SystemId = u64; @@ -206,8 +207,8 @@ use rand::distributions::{Distribution, Standard}; impl Distribution for Standard { fn sample(&self, rng: &mut R) -> CancelKeyData { CancelKeyData { - backend_pid: rng.gen(), - cancel_key: rng.gen(), + backend_pid: rng.r#gen(), + cancel_key: rng.r#gen(), } } } @@ -1035,7 +1036,7 @@ impl BeMessage<'_> { buf.put_u8(b'd'); write_body(buf, |buf| { buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol - // dependency + // dependency buf.put_u64(rec.streaming_lsn); buf.put_u64(rec.commit_lsn); buf.put_slice(rec.data); diff --git a/libs/tenant_size_model/src/calculation.rs b/libs/tenant_size_model/src/calculation.rs index be00562219..d54876ba2c 100644 --- a/libs/tenant_size_model/src/calculation.rs +++ b/libs/tenant_size_model/src/calculation.rs @@ -130,11 +130,7 @@ impl StorageModel { break; } } - if possible { - Some(snapshot_later) - } else { - None - } + if possible { Some(snapshot_later) } else { None } } else { None }; diff --git a/libs/tenant_size_model/src/svg.rs b/libs/tenant_size_model/src/svg.rs index 25ebb1c3d8..a3bc937f52 100644 --- a/libs/tenant_size_model/src/svg.rs +++ b/libs/tenant_size_model/src/svg.rs @@ -76,7 +76,10 @@ pub fn draw_svg( let mut result = String::new(); - writeln!(result, "")?; + writeln!( + result, + "" + )?; draw.calculate_svg_layout(); diff --git a/libs/tracing-utils/src/http.rs b/libs/tracing-utils/src/http.rs index 2168beee88..8560d0718c 100644 --- a/libs/tracing-utils/src/http.rs +++ b/libs/tracing-utils/src/http.rs @@ -1,8 +1,8 @@ //! Tracing wrapper for Hyper HTTP server -use hyper0::HeaderMap; -use hyper0::{Body, Request, Response}; use std::future::Future; + +use hyper0::{Body, HeaderMap, Request, Response}; use tracing::Instrument; use tracing_opentelemetry::OpenTelemetrySpanExt; diff --git a/libs/tracing-utils/src/lib.rs b/libs/tracing-utils/src/lib.rs index 818d759eac..72f94d61e4 100644 --- a/libs/tracing-utils/src/lib.rs +++ b/libs/tracing-utils/src/lib.rs @@ -36,11 +36,11 @@ pub mod http; -use opentelemetry::trace::TracerProvider; use opentelemetry::KeyValue; +use opentelemetry::trace::TracerProvider; use tracing::Subscriber; -use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::Layer; +use tracing_subscriber::registry::LookupSpan; /// Set up OpenTelemetry exporter, using configuration from environment variables. /// diff --git a/libs/utils/benches/benchmarks.rs b/libs/utils/benches/benchmarks.rs index 348e27ac47..12c620ec87 100644 --- a/libs/utils/benches/benchmarks.rs +++ b/libs/utils/benches/benchmarks.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use criterion::{criterion_group, criterion_main, Bencher, Criterion}; +use criterion::{Bencher, Criterion, criterion_group, criterion_main}; use pprof::criterion::{Output, PProfProfiler}; use utils::id; use utils::logging::log_slow; diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index 4bfd0ab055..cc5b0b1d13 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -1,12 +1,15 @@ // For details about authentication see docs/authentication.md -use arc_swap::ArcSwap; -use std::{borrow::Cow, fmt::Display, fs, sync::Arc}; +use std::borrow::Cow; +use std::fmt::Display; +use std::fs; +use std::sync::Arc; use anyhow::Result; +use arc_swap::ArcSwap; use camino::Utf8Path; use jsonwebtoken::{ - decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, + Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode, }; use serde::{Deserialize, Serialize}; @@ -129,7 +132,9 @@ impl JwtAuth { anyhow::bail!("path is neither a directory or a file") }; if decoding_keys.is_empty() { - anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected."); + anyhow::bail!( + "Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected." + ); } Ok(Self::new(decoding_keys)) } @@ -175,9 +180,10 @@ pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result #[cfg(test)] mod tests { - use super::*; use std::str::FromStr; + use super::*; + // Generated with: // // openssl genpkey -algorithm ed25519 -out ed25519-priv.pem @@ -215,7 +221,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJpYXQiOjE2Nzg0NDI0Nzl9.rNheBnluMJNgXzSTTJoTNIGy4P_qe0JUHl_nVEGuDCTgHOThPVr552EnmKccrCKquPeW3c2YUk0Y9Oh4KyASAw"; // Check it can be validated with the public key - let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]); + let auth = JwtAuth::new(vec![ + DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(), + ]); let claims_from_token = auth.decode(encoded_eddsa).unwrap().claims; assert_eq!(claims_from_token, expected_claims); } @@ -230,7 +238,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519).unwrap(); // decode it back - let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]); + let auth = JwtAuth::new(vec![ + DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(), + ]); let decoded = auth.decode(&encoded).unwrap(); assert_eq!(decoded.claims, claims); diff --git a/libs/utils/src/backoff.rs b/libs/utils/src/backoff.rs index e6503fe377..4a4c4eedbb 100644 --- a/libs/utils/src/backoff.rs +++ b/libs/utils/src/backoff.rs @@ -121,10 +121,12 @@ where #[cfg(test)] mod tests { - use super::*; use std::io; + use tokio::sync::Mutex; + use super::*; + #[test] fn backoff_defaults_produce_growing_backoff_sequence() { let mut current_backoff_value = None; diff --git a/libs/utils/src/bin_ser.rs b/libs/utils/src/bin_ser.rs index 4d173d0726..2861baeee5 100644 --- a/libs/utils/src/bin_ser.rs +++ b/libs/utils/src/bin_ser.rs @@ -13,9 +13,11 @@ #![warn(missing_docs)] -use bincode::Options; -use serde::{de::DeserializeOwned, Serialize}; use std::io::{self, Read, Write}; + +use bincode::Options; +use serde::Serialize; +use serde::de::DeserializeOwned; use thiserror::Error; /// An error that occurred during a deserialize operation @@ -261,10 +263,12 @@ impl LeSer for T {} #[cfg(test)] mod tests { - use super::DeserializeError; - use serde::{Deserialize, Serialize}; use std::io::Cursor; + use serde::{Deserialize, Serialize}; + + use super::DeserializeError; + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct ShortStruct { a: u8, diff --git a/libs/utils/src/circuit_breaker.rs b/libs/utils/src/circuit_breaker.rs index e1ddfd8650..46a6584d66 100644 --- a/libs/utils/src/circuit_breaker.rs +++ b/libs/utils/src/circuit_breaker.rs @@ -1,7 +1,5 @@ -use std::{ - fmt::Display, - time::{Duration, Instant}, -}; +use std::fmt::Display; +use std::time::{Duration, Instant}; use metrics::IntCounter; diff --git a/libs/utils/src/completion.rs b/libs/utils/src/completion.rs index f65c080ad4..973d754715 100644 --- a/libs/utils/src/completion.rs +++ b/libs/utils/src/completion.rs @@ -1,4 +1,5 @@ -use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker}; +use tokio_util::task::TaskTracker; +use tokio_util::task::task_tracker::TaskTrackerToken; /// While a reference is kept around, the associated [`Barrier::wait`] will wait. /// diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index 5241ab183c..290a5b2686 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -1,9 +1,7 @@ +use std::borrow::Cow; +use std::fs::{self, File}; +use std::io::{self, Write}; use std::os::fd::AsRawFd; -use std::{ - borrow::Cow, - fs::{self, File}, - io::{self, Write}, -}; use camino::{Utf8Path, Utf8PathBuf}; diff --git a/libs/utils/src/env.rs b/libs/utils/src/env.rs index a1bcec9229..2a85f54a01 100644 --- a/libs/utils/src/env.rs +++ b/libs/utils/src/env.rs @@ -1,6 +1,7 @@ //! Wrapper around `std::env::var` for parsing environment variables. -use std::{fmt::Display, str::FromStr}; +use std::fmt::Display; +use std::str::FromStr; /// For types `V` that implement [`FromStr`]. pub fn var(varname: &str) -> Option diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index fc998ad9a9..ce014eb0ac 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -127,6 +127,9 @@ pub async fn failpoint_sleep_cancellable_helper( tracing::info!("failpoint {:?}: sleep done", name); } +/// Initialize the configured failpoints +/// +/// You must call this function before any concurrent threads do operations. pub fn init() -> fail::FailScenario<'static> { // The failpoints lib provides support for parsing the `FAILPOINTS` env var. // We want non-default behavior for `exit`, though, so, we handle it separately. @@ -134,7 +137,10 @@ pub fn init() -> fail::FailScenario<'static> { // Format for FAILPOINTS is "name=actions" separated by ";". let actions = std::env::var("FAILPOINTS"); if actions.is_ok() { - std::env::remove_var("FAILPOINTS"); + // SAFETY: this function should before any threads start and access env vars concurrently + unsafe { + std::env::remove_var("FAILPOINTS"); + } } else { // let the library handle non-utf8, or nothing for not present } diff --git a/libs/utils/src/fs_ext.rs b/libs/utils/src/fs_ext.rs index 8e53d2c79b..a406ab0378 100644 --- a/libs/utils/src/fs_ext.rs +++ b/libs/utils/src/fs_ext.rs @@ -58,9 +58,8 @@ where #[cfg(test)] mod test { - use crate::fs_ext::{is_directory_empty, list_dir}; - use super::ignore_absent_files; + use crate::fs_ext::{is_directory_empty, list_dir}; #[test] fn is_empty_dir() { diff --git a/libs/utils/src/fs_ext/rename_noreplace.rs b/libs/utils/src/fs_ext/rename_noreplace.rs index 897e30d7f1..fc6f794b57 100644 --- a/libs/utils/src/fs_ext/rename_noreplace.rs +++ b/libs/utils/src/fs_ext/rename_noreplace.rs @@ -38,7 +38,8 @@ pub fn rename_noreplace( #[cfg(test)] mod test { - use std::{fs, path::PathBuf}; + use std::fs; + use std::path::PathBuf; use super::*; diff --git a/libs/utils/src/generation.rs b/libs/utils/src/generation.rs index 44565ee6a2..b5e4a4644a 100644 --- a/libs/utils/src/generation.rs +++ b/libs/utils/src/generation.rs @@ -169,9 +169,9 @@ mod test { ]; let mut s = String::new(); - for (line, gen, expected) in examples { + for (line, gen_, expected) in examples { s.clear(); - write!(s, "{}", &gen.get_suffix()).expect("string grows"); + write!(s, "{}", &gen_.get_suffix()).expect("string grows"); assert_eq!(s, expected, "example on {line}"); } } diff --git a/libs/utils/src/guard_arc_swap.rs b/libs/utils/src/guard_arc_swap.rs index cec5202460..26cd640d3b 100644 --- a/libs/utils/src/guard_arc_swap.rs +++ b/libs/utils/src/guard_arc_swap.rs @@ -1,8 +1,9 @@ //! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes //! don't block reads. -use arc_swap::ArcSwap; use std::sync::Arc; + +use arc_swap::ArcSwap; use tokio::sync::TryLockError; pub struct GuardArcSwap { diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index eb91839504..6016c23a01 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -1,5 +1,6 @@ +use std::fmt; use std::num::ParseIntError; -use std::{fmt, str::FromStr}; +use std::str::FromStr; use anyhow::Context; use hex::FromHex; @@ -215,7 +216,7 @@ macro_rules! id_newtype { impl AsRef<[u8]> for $t { fn as_ref(&self) -> &[u8] { - &self.0 .0 + &self.0.0 } } @@ -367,9 +368,8 @@ impl FromStr for NodeId { mod tests { use serde_assert::{Deserializer, Serializer, Token, Tokens}; - use crate::bin_ser::BeSer; - use super::*; + use crate::bin_ser::BeSer; #[test] fn test_id_serde_non_human_readable() { diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs index 0cc58738c0..2398f92766 100644 --- a/libs/utils/src/leaky_bucket.rs +++ b/libs/utils/src/leaky_bucket.rs @@ -21,15 +21,12 @@ //! //! Another explaination can be found here: -use std::{ - sync::{ - atomic::{AtomicU64, Ordering}, - Mutex, - }, - time::Duration, -}; +use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; -use tokio::{sync::Notify, time::Instant}; +use tokio::sync::Notify; +use tokio::time::Instant; pub struct LeakyBucketConfig { /// This is the "time cost" of a single request unit. diff --git a/libs/utils/src/linux_socket_ioctl.rs b/libs/utils/src/linux_socket_ioctl.rs index 5ae0e86af8..766529838c 100644 --- a/libs/utils/src/linux_socket_ioctl.rs +++ b/libs/utils/src/linux_socket_ioctl.rs @@ -2,21 +2,23 @@ //! //! -use std::{ - io, - mem::MaybeUninit, - os::{fd::RawFd, raw::c_int}, -}; +use std::io; +use std::mem::MaybeUninit; +use std::os::fd::RawFd; +use std::os::raw::c_int; use nix::libc::{FIONREAD, TIOCOUTQ}; unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result { let mut inq: MaybeUninit = MaybeUninit::uninit(); - let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr()); - if err == 0 { - Ok(inq.assume_init()) - } else { - Err(io::Error::last_os_error()) + // SAFETY: encapsulating fn is unsafe, we require `socket_fd` to be a valid file descriptor + unsafe { + let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr()); + if err == 0 { + Ok(inq.assume_init()) + } else { + Err(io::Error::last_os_error()) + } } } @@ -24,12 +26,14 @@ unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result /// /// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor. pub unsafe fn inq(socket_fd: RawFd) -> io::Result { - do_ioctl(socket_fd, FIONREAD) + // SAFETY: encapsulating fn is unsafe + unsafe { do_ioctl(socket_fd, FIONREAD) } } /// # Safety /// /// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor. pub unsafe fn outq(socket_fd: RawFd) -> io::Result { - do_ioctl(socket_fd, TIOCOUTQ) + // SAFETY: encapsulating fn is unsafe + unsafe { do_ioctl(socket_fd, TIOCOUTQ) } } diff --git a/libs/utils/src/lock_file.rs b/libs/utils/src/lock_file.rs index 3a2ed3e830..6aeeeca021 100644 --- a/libs/utils/src/lock_file.rs +++ b/libs/utils/src/lock_file.rs @@ -6,16 +6,15 @@ //! there for potential pitfalls with lock files that are used //! to store PIDs (pidfiles). -use std::{ - fs, - io::{Read, Write}, - ops::Deref, - os::unix::prelude::AsRawFd, -}; +use std::fs; +use std::io::{Read, Write}; +use std::ops::Deref; +use std::os::unix::prelude::AsRawFd; use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; -use nix::{errno::Errno::EAGAIN, fcntl}; +use nix::errno::Errno::EAGAIN; +use nix::fcntl; use crate::crashsafe; diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 2c36942f43..881f1e765d 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -273,7 +273,9 @@ fn log_panic_to_stderr( location: Option>, backtrace: &std::backtrace::Backtrace, ) { - eprintln!("panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"); + eprintln!( + "panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}" + ); } struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>); @@ -361,7 +363,8 @@ pub async fn log_slow(name: &str, threshold: Duration, f: impl Future= num { - Ok(()) - } else { - Err(cnt) - } + if cnt >= num { Ok(()) } else { Err(cnt) } } /// Register and return a channel that will be notified when a number arrives, @@ -325,9 +322,10 @@ where #[cfg(test)] mod tests { - use super::*; use std::sync::Arc; + use super::*; + impl MonotonicCounter for i32 { fn cnt_advance(&mut self, val: i32) { assert!(*self <= val); diff --git a/libs/utils/src/serde_percent.rs b/libs/utils/src/serde_percent.rs index 36e874a161..ca1e7aa25b 100644 --- a/libs/utils/src/serde_percent.rs +++ b/libs/utils/src/serde_percent.rs @@ -12,11 +12,7 @@ pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8); impl Percent { pub const fn new(pct: u8) -> Option { - if pct <= 100 { - Some(Percent(pct)) - } else { - None - } + if pct <= 100 { Some(Percent(pct)) } else { None } } pub fn get(&self) -> u8 { diff --git a/libs/utils/src/shard.rs b/libs/utils/src/shard.rs index d98284f969..c8c410a725 100644 --- a/libs/utils/src/shard.rs +++ b/libs/utils/src/shard.rs @@ -1,6 +1,7 @@ //! See `pageserver_api::shard` for description on sharding. -use std::{ops::RangeInclusive, str::FromStr}; +use std::ops::RangeInclusive; +use std::str::FromStr; use hex::FromHex; use serde::{Deserialize, Serialize}; @@ -59,11 +60,7 @@ impl ShardCount { /// This method returns the actual number of shards, i.e. if our internal value is /// zero, we return 1 (unsharded tenants have 1 shard). pub fn count(&self) -> u8 { - if self.0 > 0 { - self.0 - } else { - 1 - } + if self.0 > 0 { self.0 } else { 1 } } /// The literal internal value: this is **not** the number of shards in the diff --git a/libs/utils/src/signals.rs b/libs/utils/src/signals.rs index c37e9aea58..f2be1957c4 100644 --- a/libs/utils/src/signals.rs +++ b/libs/utils/src/signals.rs @@ -1,7 +1,7 @@ +pub use signal_hook::consts::TERM_SIGNALS; +pub use signal_hook::consts::signal::*; use signal_hook::iterator::Signals; -pub use signal_hook::consts::{signal::*, TERM_SIGNALS}; - pub enum Signal { Quit, Interrupt, diff --git a/libs/utils/src/simple_rcu.rs b/libs/utils/src/simple_rcu.rs index 6700f86e4a..fabdf9df46 100644 --- a/libs/utils/src/simple_rcu.rs +++ b/libs/utils/src/simple_rcu.rs @@ -44,8 +44,7 @@ #![warn(missing_docs)] use std::ops::Deref; -use std::sync::{Arc, Weak}; -use std::sync::{RwLock, RwLockWriteGuard}; +use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak}; use tokio::sync::watch; @@ -219,10 +218,11 @@ impl RcuWaitList { #[cfg(test)] mod tests { - use super::*; use std::sync::Mutex; use std::time::Duration; + use super::*; + #[tokio::test] async fn two_writers() { let rcu = Rcu::new(1); diff --git a/libs/utils/src/sync/gate.rs b/libs/utils/src/sync/gate.rs index 0a1ed81621..93460785bf 100644 --- a/libs/utils/src/sync/gate.rs +++ b/libs/utils/src/sync/gate.rs @@ -1,10 +1,6 @@ -use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; /// Gates are a concurrency helper, primarily used for implementing safe shutdown. /// diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 66c2065554..8f8401b35d 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -1,7 +1,6 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, Mutex, MutexGuard, -}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; + use tokio::sync::Semaphore; /// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of @@ -301,14 +300,13 @@ impl Drop for InitPermit { #[cfg(test)] mod tests { + use std::convert::Infallible; + use std::pin::{Pin, pin}; + use std::time::Duration; + use futures::Future; use super::*; - use std::{ - convert::Infallible, - pin::{pin, Pin}, - time::Duration, - }; #[tokio::test] async fn many_initializers() { diff --git a/libs/utils/src/sync/spsc_fold.rs b/libs/utils/src/sync/spsc_fold.rs index 0cab291d51..7dfbf40411 100644 --- a/libs/utils/src/sync/spsc_fold.rs +++ b/libs/utils/src/sync/spsc_fold.rs @@ -1,4 +1,5 @@ -use core::{future::poll_fn, task::Poll}; +use core::future::poll_fn; +use core::task::Poll; use std::sync::{Arc, Mutex}; use diatomic_waker::DiatomicWaker; diff --git a/libs/utils/src/tcp_listener.rs b/libs/utils/src/tcp_listener.rs index 6b35d3d63a..6a4a77127d 100644 --- a/libs/utils/src/tcp_listener.rs +++ b/libs/utils/src/tcp_listener.rs @@ -1,9 +1,8 @@ -use std::{ - io, - net::{TcpListener, ToSocketAddrs}, -}; +use std::io; +use std::net::{TcpListener, ToSocketAddrs}; -use nix::sys::socket::{setsockopt, sockopt::ReuseAddr}; +use nix::sys::socket::setsockopt; +use nix::sys::socket::sockopt::ReuseAddr; /// Bind a [`TcpListener`] to addr with `SO_REUSEADDR` set to true. pub fn bind(addr: A) -> io::Result { diff --git a/libs/utils/src/tracing_span_assert.rs b/libs/utils/src/tracing_span_assert.rs index add2fa7920..3d15e08400 100644 --- a/libs/utils/src/tracing_span_assert.rs +++ b/libs/utils/src/tracing_span_assert.rs @@ -172,16 +172,14 @@ fn tracing_subscriber_configured() -> bool { #[cfg(test)] mod tests { + use std::collections::HashSet; + use std::fmt::{self}; + use std::hash::{Hash, Hasher}; + use tracing_subscriber::prelude::*; use super::*; - use std::{ - collections::HashSet, - fmt::{self}, - hash::{Hash, Hasher}, - }; - struct MemoryIdentity<'a>(&'a dyn Extractor); impl MemoryIdentity<'_> { diff --git a/libs/utils/src/try_rcu.rs b/libs/utils/src/try_rcu.rs index 6b53ab1316..30540c27d0 100644 --- a/libs/utils/src/try_rcu.rs +++ b/libs/utils/src/try_rcu.rs @@ -44,10 +44,12 @@ where #[cfg(test)] mod tests { - use super::*; - use arc_swap::ArcSwap; use std::sync::Arc; + use arc_swap::ArcSwap; + + use super::*; + #[test] fn test_try_rcu_success() { let swap = ArcSwap::from(Arc::new(42)); diff --git a/libs/utils/src/vec_map.rs b/libs/utils/src/vec_map.rs index 1fe048c6f0..eded86af3e 100644 --- a/libs/utils/src/vec_map.rs +++ b/libs/utils/src/vec_map.rs @@ -1,4 +1,6 @@ -use std::{alloc::Layout, cmp::Ordering, ops::RangeBounds}; +use std::alloc::Layout; +use std::cmp::Ordering; +use std::ops::RangeBounds; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum VecMapOrdering { @@ -214,7 +216,8 @@ fn extract_key(entry: &(K, V)) -> &K { #[cfg(test)] mod tests { - use std::{collections::BTreeMap, ops::Bound}; + use std::collections::BTreeMap; + use std::ops::Bound; use super::{VecMap, VecMapOrdering}; diff --git a/libs/utils/src/zstd.rs b/libs/utils/src/zstd.rs index be2dcc00f5..96c2a83951 100644 --- a/libs/utils/src/zstd.rs +++ b/libs/utils/src/zstd.rs @@ -1,19 +1,14 @@ use std::io::SeekFrom; use anyhow::{Context, Result}; -use async_compression::{ - tokio::{bufread::ZstdDecoder, write::ZstdEncoder}, - zstd::CParameter, - Level, -}; +use async_compression::Level; +use async_compression::tokio::bufread::ZstdDecoder; +use async_compression::tokio::write::ZstdEncoder; +use async_compression::zstd::CParameter; use camino::Utf8Path; use nix::NixPath; -use tokio::{ - fs::{File, OpenOptions}, - io::AsyncBufRead, - io::AsyncSeekExt, - io::AsyncWriteExt, -}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncBufRead, AsyncSeekExt, AsyncWriteExt}; use tokio_tar::{Archive, Builder, HeaderMode}; use walkdir::WalkDir; diff --git a/libs/utils/tests/bin_ser_test.rs b/libs/utils/tests/bin_ser_test.rs index b995b61b78..e0c8cdde00 100644 --- a/libs/utils/tests/bin_ser_test.rs +++ b/libs/utils/tests/bin_ser_test.rs @@ -1,7 +1,8 @@ +use std::io::Read; + use bytes::{Buf, BytesMut}; use hex_literal::hex; use serde::Deserialize; -use std::io::Read; use utils::bin_ser::LeSer; #[derive(Debug, PartialEq, Eq, Deserialize)] diff --git a/libs/wal_decoder/benches/bench_interpret_wal.rs b/libs/wal_decoder/benches/bench_interpret_wal.rs index 846904cf87..ed6ba4d267 100644 --- a/libs/wal_decoder/benches/bench_interpret_wal.rs +++ b/libs/wal_decoder/benches/bench_interpret_wal.rs @@ -1,23 +1,25 @@ -use anyhow::Context; -use criterion::{criterion_group, criterion_main, Criterion}; -use futures::{stream::FuturesUnordered, StreamExt}; -use pageserver_api::shard::{ShardIdentity, ShardStripeSize}; -use postgres_ffi::{waldecoder::WalStreamDecoder, MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; -use pprof::criterion::{Output, PProfProfiler}; -use serde::Deserialize; -use std::{env, num::NonZeroUsize, sync::Arc}; +use std::env; +use std::num::NonZeroUsize; +use std::sync::Arc; +use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use camino_tempfile::Utf8TempDir; +use criterion::{Criterion, criterion_group, criterion_main}; +use futures::StreamExt; +use futures::stream::FuturesUnordered; +use pageserver_api::shard::{ShardIdentity, ShardStripeSize}; +use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; +use pprof::criterion::{Output, PProfProfiler}; use remote_storage::{ DownloadOpts, GenericRemoteStorage, ListingMode, RemoteStorageConfig, RemoteStorageKind, S3Config, }; +use serde::Deserialize; use tokio_util::sync::CancellationToken; -use utils::{ - lsn::Lsn, - shard::{ShardCount, ShardNumber}, -}; +use utils::lsn::Lsn; +use utils::shard::{ShardCount, ShardNumber}; use wal_decoder::models::InterpretedWalRecord; const S3_BUCKET: &str = "neon-github-public-dev"; @@ -31,7 +33,7 @@ const METADATA_FILENAME: &str = "metadata.json"; static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[allow(non_upper_case_globals)] -#[export_name = "malloc_conf"] +#[unsafe(export_name = "malloc_conf")] pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0"; async fn create_s3_client() -> anyhow::Result> { diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index ebb38ceb52..cb0835e894 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; -use crate::models::*; -use crate::serialized_batch::SerializedValueBatch; use bytes::{Buf, Bytes}; use pageserver_api::key::rel_block_to_key; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -14,6 +12,9 @@ use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::walrecord::*; use utils::lsn::Lsn; +use crate::models::*; +use crate::serialized_batch::SerializedValueBatch; + impl InterpretedWalRecord { /// Decode and interpreted raw bytes which represent one Postgres WAL record. /// Data blocks which do not match any of the provided shard identities are filtered out. diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index d76f75f51f..b451d6d8e0 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -8,20 +8,18 @@ use std::collections::{BTreeSet, HashMap}; use bytes::{Bytes, BytesMut}; -use pageserver_api::key::rel_block_to_key; +use pageserver_api::key::{CompactKey, Key, rel_block_to_key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::RelTag; use pageserver_api::shard::ShardIdentity; -use pageserver_api::{key::CompactKey, value::Value}; +use pageserver_api::value::Value; use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord}; -use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; +use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants}; use serde::{Deserialize, Serialize}; use utils::bin_ser::BeSer; use utils::lsn::Lsn; -use pageserver_api::key::Key; - use crate::models::InterpretedWalRecord; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); @@ -515,10 +513,11 @@ impl SerializedValueBatch { let empty = self.raw.is_empty(); if cfg!(debug_assertions) && empty { - assert!(self - .metadata - .iter() - .all(|meta| matches!(meta, ValueMeta::Observed(_)))); + assert!( + self.metadata + .iter() + .all(|meta| matches!(meta, ValueMeta::Observed(_))) + ); } !empty diff --git a/libs/wal_decoder/src/wire_format.rs b/libs/wal_decoder/src/wire_format.rs index 52ed5c70b5..5a28128dd8 100644 --- a/libs/wal_decoder/src/wire_format.rs +++ b/libs/wal_decoder/src/wire_format.rs @@ -7,15 +7,12 @@ use utils::lsn::Lsn; use utils::postgres_client::{Compression, InterpretedFormat}; use crate::models::{ - FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, + FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, proto, }; - use crate::serialized_batch::{ ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta, }; -use crate::models::proto; - #[derive(Debug, thiserror::Error)] pub enum ToWireFormatError { #[error("{0}")] @@ -83,8 +80,8 @@ impl ToWireFormat for InterpretedWalRecords { format: InterpretedFormat, compression: Option, ) -> Result { - use async_compression::tokio::write::ZstdEncoder; use async_compression::Level; + use async_compression::tokio::write::ZstdEncoder; let encode_res: Result = match format { InterpretedFormat::Bincode => { diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index 8d5b1ade35..530ceb1327 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -1,9 +1,11 @@ //! Links with walproposer, pgcommon, pgport and runs bindgen on walproposer.h //! to generate Rust bindings for it. -use std::{env, path::PathBuf, process::Command}; +use std::env; +use std::path::PathBuf; +use std::process::Command; -use anyhow::{anyhow, Context}; +use anyhow::{Context, anyhow}; const WALPROPOSER_PG_VERSION: &str = "v17"; diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 2fbea3fe45..d660602149 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -3,27 +3,14 @@ #![allow(dead_code)] -use std::ffi::CStr; -use std::ffi::CString; +use std::ffi::{CStr, CString}; -use crate::bindings::uint32; -use crate::bindings::walproposer_api; -use crate::bindings::NeonWALReadResult; -use crate::bindings::PGAsyncReadResult; -use crate::bindings::PGAsyncWriteResult; -use crate::bindings::Safekeeper; -use crate::bindings::Size; -use crate::bindings::StringInfoData; -use crate::bindings::TimestampTz; -use crate::bindings::WalProposer; -use crate::bindings::WalProposerConnStatusType; -use crate::bindings::WalProposerConnectPollStatusType; -use crate::bindings::WalProposerExecStatusType; -use crate::bindings::WalproposerShmemState; -use crate::bindings::XLogRecPtr; -use crate::walproposer::ApiImpl; -use crate::walproposer::StreamingCallback; -use crate::walproposer::WaitResult; +use crate::bindings::{ + NeonWALReadResult, PGAsyncReadResult, PGAsyncWriteResult, Safekeeper, Size, StringInfoData, + TimestampTz, WalProposer, WalProposerConnStatusType, WalProposerConnectPollStatusType, + WalProposerExecStatusType, WalproposerShmemState, XLogRecPtr, uint32, walproposer_api, +}; +use crate::walproposer::{ApiImpl, StreamingCallback, WaitResult}; extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState { unsafe { diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 60b606c64a..4e50c21fca 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -2,15 +2,15 @@ use std::ffi::CString; -use crate::{ - api_bindings::{create_api, take_vec_u8, Level}, - bindings::{ - NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig, - WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart, - }, -}; use postgres_ffi::WAL_SEGMENT_SIZE; -use utils::{id::TenantTimelineId, lsn::Lsn}; +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; + +use crate::api_bindings::{Level, create_api, take_vec_u8}; +use crate::bindings::{ + NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig, + WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart, +}; /// Rust high-level wrapper for C walproposer API. Many methods are not required /// for simple cases, hence todo!() in default implementations. @@ -275,22 +275,17 @@ impl StreamingCallback { #[cfg(test)] mod tests { use core::panic; - use std::{ - cell::Cell, - ffi::CString, - sync::{atomic::AtomicUsize, mpsc::sync_channel}, - }; + use std::cell::{Cell, UnsafeCell}; + use std::ffi::CString; + use std::sync::atomic::AtomicUsize; + use std::sync::mpsc::sync_channel; - use std::cell::UnsafeCell; use utils::id::TenantTimelineId; - use crate::{ - api_bindings::Level, - bindings::{NeonWALReadResult, PG_VERSION_NUM}, - walproposer::Wrapper, - }; - use super::ApiImpl; + use crate::api_bindings::Level; + use crate::bindings::{NeonWALReadResult, PG_VERSION_NUM}; + use crate::walproposer::Wrapper; #[derive(Clone, Copy, Debug)] struct WaitEventsData { diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index bb0f64ca32..f19b4e964d 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,17 +1,15 @@ -use std::{collections::HashMap, error::Error as _}; +use std::collections::HashMap; +use std::error::Error as _; use bytes::Bytes; -use reqwest::{IntoUrl, Method, StatusCode}; - use detach_ancestor::AncestorDetached; use http_utils::error::HttpErrorBody; -use pageserver_api::{models::*, shard::TenantShardId}; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; - +use pageserver_api::models::*; +use pageserver_api::shard::TenantShardId; pub use reqwest::Body as ReqwestBody; +use reqwest::{IntoUrl, Method, StatusCode}; +use utils::id::{TenantId, TimelineId}; +use utils::lsn::Lsn; use crate::BlockUnblock; diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 47da83b0eb..ef35ac2f48 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -1,23 +1,16 @@ use std::sync::{Arc, Mutex}; -use futures::{ - stream::{SplitSink, SplitStream}, - SinkExt, StreamExt, -}; -use pageserver_api::{ - models::{ - PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, - PagestreamGetPageResponse, - }, - reltag::RelTag, +use futures::stream::{SplitSink, SplitStream}; +use futures::{SinkExt, StreamExt}; +use pageserver_api::models::{ + PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, }; +use pageserver_api::reltag::RelTag; use tokio::task::JoinHandle; use tokio_postgres::CopyOutStream; use tokio_util::sync::CancellationToken; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; +use utils::id::{TenantId, TimelineId}; +use utils::lsn::Lsn; pub struct Client { client: tokio_postgres::Client, diff --git a/pageserver/compaction/src/bin/compaction-simulator.rs b/pageserver/compaction/src/bin/compaction-simulator.rs index c308694ae1..dd35417333 100644 --- a/pageserver/compaction/src/bin/compaction-simulator.rs +++ b/pageserver/compaction/src/bin/compaction-simulator.rs @@ -1,11 +1,11 @@ -use clap::{Parser, Subcommand}; -use pageserver_compaction::helpers::PAGE_SZ; -use pageserver_compaction::simulator::MockTimeline; -use rand::Rng; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::OnceLock; +use clap::{Parser, Subcommand}; +use pageserver_compaction::helpers::PAGE_SZ; +use pageserver_compaction::simulator::MockTimeline; +use rand::Rng; use utils::project_git_version; project_git_version!(GIT_VERSION); @@ -157,8 +157,9 @@ async fn run_suite() -> anyhow::Result<()> { use std::fs::File; use std::io::Stdout; use std::sync::Mutex; -use tracing_subscriber::fmt::writer::EitherWriter; + use tracing_subscriber::fmt::MakeWriter; +use tracing_subscriber::fmt::writer::EitherWriter; static LOG_FILE: OnceLock>> = OnceLock::new(); fn get_log_output() -> &'static Mutex> { diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 02b11910ce..75f43d7ff7 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -17,20 +17,19 @@ //! distance of image layers in LSN dimension is roughly equal to the logical //! database size. For example, if the logical database size is 10 GB, we would //! generate new image layers every 10 GB of WAL. -use futures::StreamExt; -use pageserver_api::shard::ShardIdentity; -use tracing::{debug, info}; - use std::collections::{HashSet, VecDeque}; use std::ops::Range; -use crate::helpers::{ - accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with, PAGE_SZ, -}; -use crate::interface::*; +use futures::StreamExt; +use pageserver_api::shard::ShardIdentity; +use tracing::{debug, info}; use utils::lsn::Lsn; +use crate::helpers::{ + PAGE_SZ, accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with, +}; use crate::identify_levels::identify_level; +use crate::interface::*; /// Main entry point to compaction. /// @@ -541,10 +540,11 @@ where } } // Open stream - let key_value_stream = - std::pin::pin!(merge_delta_keys_buffered::(deltas.as_slice(), ctx) + let key_value_stream = std::pin::pin!( + merge_delta_keys_buffered::(deltas.as_slice(), ctx) .await? - .map(Result::<_, anyhow::Error>::Ok)); + .map(Result::<_, anyhow::Error>::Ok) + ); let mut new_jobs = Vec::new(); // Slide a window through the keyspace diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 7e4e3042b3..421802eef3 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -1,21 +1,21 @@ //! This file contains generic utility functions over the interface types, //! which could be handy for any compaction implementation. -use crate::interface::*; +use std::collections::{BinaryHeap, VecDeque}; +use std::fmt::Display; +use std::future::Future; +use std::ops::{DerefMut, Range}; +use std::pin::Pin; +use std::task::{Poll, ready}; use futures::future::BoxFuture; use futures::{Stream, StreamExt}; use itertools::Itertools; use pageserver_api::shard::ShardIdentity; use pin_project_lite::pin_project; -use std::collections::BinaryHeap; -use std::collections::VecDeque; -use std::fmt::Display; -use std::future::Future; -use std::ops::{DerefMut, Range}; -use std::pin::Pin; -use std::task::{ready, Poll}; use utils::lsn::Lsn; +use crate::interface::*; + pub const PAGE_SZ: u64 = 8192; pub fn keyspace_total_size( diff --git a/pageserver/compaction/src/identify_levels.rs b/pageserver/compaction/src/identify_levels.rs index e04bd15396..61575e3992 100644 --- a/pageserver/compaction/src/identify_levels.rs +++ b/pageserver/compaction/src/identify_levels.rs @@ -26,15 +26,15 @@ //! file size, the file will still be considered to be part of L0 at the next //! iteration. -use anyhow::bail; use std::collections::BTreeSet; use std::ops::Range; + +use anyhow::bail; +use tracing::{info, trace}; use utils::lsn::Lsn; use crate::interface::*; -use tracing::{info, trace}; - pub struct Level { pub lsn_range: Range, pub layers: Vec, @@ -60,7 +60,11 @@ where if l.lsn_range().start < end_lsn && l.lsn_range().end > end_lsn { // shouldn't happen. Indicates that the caller passed a bogus // end_lsn. - bail!("identify_level() called with end_lsn that does not partition the LSN space: end_lsn {} intersects with layer {}", end_lsn, l.short_id()); + bail!( + "identify_level() called with end_lsn that does not partition the LSN space: end_lsn {} intersects with layer {}", + end_lsn, + l.short_id() + ); } // include image layers sitting exacty at `end_lsn`. let is_image = !l.is_delta(); @@ -246,9 +250,10 @@ impl Level { #[cfg(test)] mod tests { + use std::sync::{Arc, Mutex}; + use super::*; use crate::simulator::{Key, MockDeltaLayer, MockImageLayer, MockLayer}; - use std::sync::{Arc, Mutex}; fn delta(key_range: Range, lsn_range: Range) -> MockLayer { MockLayer::Delta(Arc::new(MockDeltaLayer { diff --git a/pageserver/compaction/src/interface.rs b/pageserver/compaction/src/interface.rs index 92723faeaf..63fbc565cc 100644 --- a/pageserver/compaction/src/interface.rs +++ b/pageserver/compaction/src/interface.rs @@ -3,9 +3,12 @@ //! //! All the heavy lifting is done by the create_image and create_delta //! functions that the implementor provides. -use futures::Future; -use pageserver_api::{key::Key, keyspace::ShardedRange, shard::ShardIdentity}; use std::ops::Range; + +use futures::Future; +use pageserver_api::key::Key; +use pageserver_api::keyspace::ShardedRange; +use pageserver_api::shard::ShardIdentity; use utils::lsn::Lsn; /// Public interface. This is the main thing that the implementor needs to provide diff --git a/pageserver/compaction/src/simulator.rs b/pageserver/compaction/src/simulator.rs index 341fceba6f..bf9f6f2658 100644 --- a/pageserver/compaction/src/simulator.rs +++ b/pageserver/compaction/src/simulator.rs @@ -1,22 +1,17 @@ mod draw; -use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp}; +use std::fmt::Write; +use std::ops::Range; +use std::sync::{Arc, Mutex}; +use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp}; use futures::StreamExt; use pageserver_api::shard::ShardIdentity; use rand::Rng; use tracing::info; - use utils::lsn::Lsn; -use std::fmt::Write; -use std::ops::Range; -use std::sync::Arc; -use std::sync::Mutex; - -use crate::helpers::PAGE_SZ; -use crate::helpers::{merge_delta_keys, overlaps_with}; - +use crate::helpers::{PAGE_SZ, merge_delta_keys, overlaps_with}; use crate::interface; use crate::interface::CompactionLayer; diff --git a/pageserver/compaction/src/simulator/draw.rs b/pageserver/compaction/src/simulator/draw.rs index 4559db09f1..3d35d1b91e 100644 --- a/pageserver/compaction/src/simulator/draw.rs +++ b/pageserver/compaction/src/simulator/draw.rs @@ -1,14 +1,14 @@ -use super::Key; -use anyhow::Result; use std::cmp::Ordering; -use std::{ - collections::{BTreeMap, BTreeSet, HashSet}, - fmt::Write, - ops::Range, -}; -use svg_fmt::{rgb, BeginSvg, EndSvg, Fill, Stroke, Style}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::fmt::Write; +use std::ops::Range; + +use anyhow::Result; +use svg_fmt::{BeginSvg, EndSvg, Fill, Stroke, Style, rgb}; use utils::lsn::Lsn; +use super::Key; + // Map values to their compressed coordinate - the index the value // would have in a sorted and deduplicated list of all values. struct CoordinateMap { diff --git a/pageserver/ctl/src/draw_timeline_dir.rs b/pageserver/ctl/src/draw_timeline_dir.rs index 177e65ef79..80ca414543 100644 --- a/pageserver/ctl/src/draw_timeline_dir.rs +++ b/pageserver/ctl/src/draw_timeline_dir.rs @@ -50,18 +50,18 @@ //! ``` //! -use anyhow::{Context, Result}; -use pageserver_api::key::Key; use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet}; use std::io::{self, BufRead}; +use std::ops::Range; use std::path::PathBuf; use std::str::FromStr; -use std::{ - collections::{BTreeMap, BTreeSet}, - ops::Range, -}; -use svg_fmt::{rectangle, rgb, BeginSvg, EndSvg, Fill, Stroke}; -use utils::{lsn::Lsn, project_git_version}; + +use anyhow::{Context, Result}; +use pageserver_api::key::Key; +use svg_fmt::{BeginSvg, EndSvg, Fill, Stroke, rectangle, rgb}; +use utils::lsn::Lsn; +use utils::project_git_version; project_git_version!(GIT_VERSION); diff --git a/pageserver/ctl/src/key.rs b/pageserver/ctl/src/key.rs index c7f0719c41..600f7c412e 100644 --- a/pageserver/ctl/src/key.rs +++ b/pageserver/ctl/src/key.rs @@ -1,11 +1,10 @@ +use std::str::FromStr; + use anyhow::Context; use clap::Parser; -use pageserver_api::{ - key::Key, - reltag::{BlockNumber, RelTag, SlruKind}, - shard::{ShardCount, ShardStripeSize}, -}; -use std::str::FromStr; +use pageserver_api::key::Key; +use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; +use pageserver_api::shard::{ShardCount, ShardStripeSize}; #[derive(Parser)] pub(super) struct DescribeKeyCommand { @@ -394,7 +393,10 @@ mod tests { fn single_positional_spanalike_is_key_material() { // why is this needed? if you are checking many, then copypaste starts to appeal let strings = [ - (line!(), "2024-05-15T15:33:49.873906Z ERROR page_service_conn_main{peer_addr=A:B}:process_query{tenant_id=C timeline_id=D}:handle_pagerequests:handle_get_page_at_lsn_request{rel=1663/208101/2620_fsm blkno=2 req_lsn=0/238D98C8}: error reading relation or page version: Read error: could not find data for key 000000067F00032CE5000000000000000001 (shard ShardNumber(0)) at LSN 0/1D0A16C1, request LSN 0/238D98C8, ancestor 0/0"), + ( + line!(), + "2024-05-15T15:33:49.873906Z ERROR page_service_conn_main{peer_addr=A:B}:process_query{tenant_id=C timeline_id=D}:handle_pagerequests:handle_get_page_at_lsn_request{rel=1663/208101/2620_fsm blkno=2 req_lsn=0/238D98C8}: error reading relation or page version: Read error: could not find data for key 000000067F00032CE5000000000000000001 (shard ShardNumber(0)) at LSN 0/1D0A16C1, request LSN 0/238D98C8, ancestor 0/0", + ), (line!(), "rel=1663/208101/2620_fsm blkno=2"), (line!(), "rel=1663/208101/2620.1 blkno=2"), ]; @@ -420,7 +422,15 @@ mod tests { #[test] fn multiple_spanlike_args() { let strings = [ - (line!(), &["process_query{tenant_id=C", "timeline_id=D}:handle_pagerequests:handle_get_page_at_lsn_request{rel=1663/208101/2620_fsm", "blkno=2", "req_lsn=0/238D98C8}"][..]), + ( + line!(), + &[ + "process_query{tenant_id=C", + "timeline_id=D}:handle_pagerequests:handle_get_page_at_lsn_request{rel=1663/208101/2620_fsm", + "blkno=2", + "req_lsn=0/238D98C8}", + ][..], + ), (line!(), &["rel=1663/208101/2620_fsm", "blkno=2"][..]), (line!(), &["1663/208101/2620_fsm", "2"][..]), ]; diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 2c350d6d86..b426f977cf 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -2,27 +2,27 @@ //! //! Currently it only analyzes holes, which are regions within the layer range that the layer contains no updates for. In the future it might do more analysis (maybe key quantiles?) but it should never return sensitive data. -use anyhow::{anyhow, Result}; -use camino::{Utf8Path, Utf8PathBuf}; -use pageserver::context::{DownloadBehavior, RequestContext}; -use pageserver::task_mgr::TaskKind; -use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; -use pageserver::virtual_file::api::IoMode; use std::cmp::Ordering; use std::collections::BinaryHeap; use std::ops::Range; use std::str::FromStr; use std::{fs, str}; +use anyhow::{Result, anyhow}; +use camino::{Utf8Path, Utf8PathBuf}; +use pageserver::context::{DownloadBehavior, RequestContext}; use pageserver::page_cache::{self, PAGE_SZ}; +use pageserver::task_mgr::TaskKind; use pageserver::tenant::block_io::FileBlockReader; use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; -use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE}; -use pageserver::tenant::storage_layer::{range_overlaps, LayerName}; +use pageserver::tenant::storage_layer::delta_layer::{DELTA_KEY_SIZE, Summary}; +use pageserver::tenant::storage_layer::{LayerName, range_overlaps}; +use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; +use pageserver::virtual_file::api::IoMode; use pageserver::virtual_file::{self, VirtualFile}; -use pageserver_api::key::{Key, KEY_SIZE}; - -use utils::{bin_ser::BeSer, lsn::Lsn}; +use pageserver_api::key::{KEY_SIZE, Key}; +use utils::bin_ser::BeSer; +use utils::lsn::Lsn; use crate::AnalyzeLayerMapCmd; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 4c2c3ab30e..05fb35ff09 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -1,3 +1,4 @@ +use std::fs::{self, File}; use std::path::{Path, PathBuf}; use anyhow::Result; @@ -5,12 +6,10 @@ use camino::{Utf8Path, Utf8PathBuf}; use clap::Subcommand; use pageserver::context::{DownloadBehavior, RequestContext}; use pageserver::task_mgr::TaskKind; -use pageserver::tenant::storage_layer::{delta_layer, image_layer}; -use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer}; +use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer, delta_layer, image_layer}; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use pageserver::virtual_file::api::IoMode; use pageserver::{page_cache, virtual_file}; -use std::fs::{self, File}; use utils::id::{TenantId, TimelineId}; use crate::layer_map_analyzer::parse_filename; diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 353b4bd2f9..72a120a69b 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -11,33 +11,29 @@ mod layer_map_analyzer; mod layers; mod page_trace; -use page_trace::PageTraceCmd; -use std::{ - str::FromStr, - time::{Duration, SystemTime}, -}; +use std::str::FromStr; +use std::time::{Duration, SystemTime}; use camino::{Utf8Path, Utf8PathBuf}; use clap::{Parser, Subcommand}; use index_part::IndexPartCmd; use layers::LayerCmd; -use pageserver::{ - context::{DownloadBehavior, RequestContext}, - page_cache, - task_mgr::TaskKind, - tenant::{dump_layerfile_from_path, metadata::TimelineMetadata}, - virtual_file::{self, api::IoMode}, -}; +use page_trace::PageTraceCmd; +use pageserver::context::{DownloadBehavior, RequestContext}; +use pageserver::page_cache; +use pageserver::task_mgr::TaskKind; +use pageserver::tenant::dump_layerfile_from_path; +use pageserver::tenant::metadata::TimelineMetadata; +use pageserver::virtual_file::api::IoMode; +use pageserver::virtual_file::{self}; use pageserver_api::shard::TenantShardId; use postgres_ffi::ControlFileData; use remote_storage::{RemotePath, RemoteStorageConfig}; use tokio_util::sync::CancellationToken; -use utils::{ - id::TimelineId, - logging::{self, LogFormat, TracingErrorLayerEnablement}, - lsn::Lsn, - project_git_version, -}; +use utils::id::TimelineId; +use utils::logging::{self, LogFormat, TracingErrorLayerEnablement}; +use utils::lsn::Lsn; +use utils::project_git_version; project_git_version!(GIT_VERSION); @@ -355,7 +351,9 @@ mod tests { assert_valid("pageserver/v1/tenants/3aa8fcc61f6d357410b7de754b1d9001/timelines"); assert_valid("pageserver/v1/tenants/3aa8fcc61f6d357410b7de754b1d9001-0004/timelines"); assert_valid("pageserver/v1/tenants/3aa8fcc61f6d357410b7de754b1d9001/timelines/"); - assert_valid("pageserver/v1/tenants/3aa8fcc61f6d357410b7de754b1d9001/timelines/641e5342083b2235ee3deb8066819683"); + assert_valid( + "pageserver/v1/tenants/3aa8fcc61f6d357410b7de754b1d9001/timelines/641e5342083b2235ee3deb8066819683", + ); assert_eq!(validate_prefix("pageserver/v1/tenants/"), None); } } diff --git a/pageserver/pagebench/src/cmd/aux_files.rs b/pageserver/pagebench/src/cmd/aux_files.rs index b869a0c6c7..bab17540f5 100644 --- a/pageserver/pagebench/src/cmd/aux_files.rs +++ b/pageserver/pagebench/src/cmd/aux_files.rs @@ -1,12 +1,12 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + use pageserver_api::models::{TenantConfig, TenantConfigRequest}; use pageserver_api::shard::TenantShardId; use utils::id::TenantTimelineId; use utils::lsn::Lsn; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Instant; - /// Ingest aux files into the pageserver. #[derive(clap::Parser)] pub(crate) struct Args { diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 3ae6d99aa7..51d7d5df89 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -1,16 +1,3 @@ -use anyhow::Context; -use pageserver_api::shard::TenantShardId; -use pageserver_client::mgmt_api::ForceAwaitLogicalSize; -use pageserver_client::page_service::BasebackupRequest; - -use utils::id::TenantTimelineId; -use utils::lsn::Lsn; - -use rand::prelude::*; -use tokio::sync::Barrier; -use tokio::task::JoinSet; -use tracing::{info, instrument}; - use std::collections::HashMap; use std::num::NonZeroUsize; use std::ops::Range; @@ -18,6 +5,17 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; +use anyhow::Context; +use pageserver_api::shard::TenantShardId; +use pageserver_client::mgmt_api::ForceAwaitLogicalSize; +use pageserver_client::page_service::BasebackupRequest; +use rand::prelude::*; +use tokio::sync::Barrier; +use tokio::task::JoinSet; +use tracing::{info, instrument}; +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; + use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index a60efc7567..617676c079 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -1,18 +1,3 @@ -use anyhow::Context; -use camino::Utf8PathBuf; -use pageserver_api::key::Key; -use pageserver_api::keyspace::KeySpaceAccum; -use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; - -use pageserver_api::shard::TenantShardId; -use tokio_util::sync::CancellationToken; -use utils::id::TenantTimelineId; -use utils::lsn::Lsn; - -use rand::prelude::*; -use tokio::task::JoinSet; -use tracing::info; - use std::collections::{HashSet, VecDeque}; use std::future::Future; use std::num::NonZeroUsize; @@ -21,6 +6,19 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use anyhow::Context; +use camino::Utf8PathBuf; +use pageserver_api::key::Key; +use pageserver_api::keyspace::KeySpaceAccum; +use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; +use pageserver_api::shard::TenantShardId; +use rand::prelude::*; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::info; +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; + use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; diff --git a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs index 1bb71b9353..3194e2e753 100644 --- a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs +++ b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs @@ -1,23 +1,19 @@ -use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId}; +use std::f64; +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; +use pageserver_api::models::HistoricLayerInfo; +use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use rand::seq::SliceRandom; +use tokio::sync::{OwnedSemaphorePermit, mpsc}; +use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use utils::id::{TenantTimelineId, TimelineId}; -use std::{f64, sync::Arc}; -use tokio::{ - sync::{mpsc, OwnedSemaphorePermit}, - task::JoinSet, -}; - -use std::{ - num::NonZeroUsize, - sync::atomic::{AtomicU64, Ordering}, - time::{Duration, Instant}, -}; - /// Evict & on-demand download random layers. #[derive(clap::Parser)] pub(crate) struct Args { diff --git a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs index f07beeecfd..16abbf9ffd 100644 --- a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs +++ b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs @@ -2,11 +2,10 @@ use std::sync::Arc; use humantime::Duration; use pageserver_api::shard::TenantShardId; +use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use tokio::task::JoinSet; use utils::id::TenantTimelineId; -use pageserver_client::mgmt_api::ForceAwaitLogicalSize; - #[derive(clap::Parser)] pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index 5c305769dd..0e92e87103 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -3,17 +3,16 @@ //! Partially copied from pageserver client; some parts might be better to be //! united. +use std::error::Error as _; + use http_utils::error::HttpErrorBody; use reqwest::{IntoUrl, Method, StatusCode}; use safekeeper_api::models::{ PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest, TimelineStatus, }; -use std::error::Error as _; -use utils::{ - id::{NodeId, TenantId, TimelineId}, - logging::SecretString, -}; +use utils::id::{NodeId, TenantId, TimelineId}; +use utils::logging::SecretString; #[derive(Debug, Clone)] pub struct Client { diff --git a/storage_controller/client/src/control_api.rs b/storage_controller/client/src/control_api.rs index f8a2790769..7888b18aa7 100644 --- a/storage_controller/client/src/control_api.rs +++ b/storage_controller/client/src/control_api.rs @@ -1,6 +1,7 @@ use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt}; use reqwest::{Method, Url}; -use serde::{de::DeserializeOwned, Serialize}; +use serde::Serialize; +use serde::de::DeserializeOwned; pub struct Client { base_url: Url, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 8671e340bd..9a3e042c24 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1856,7 +1856,7 @@ impl Service { } Ok(AttachHookResponse { - r#gen: attach_req + generation: attach_req .node_id .map(|_| tenant_shard.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()), })