From f94286f0c929dce82c8903b29688b5bd230b72aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 26 Feb 2025 14:12:26 +0100 Subject: [PATCH] Upgrade compute_tools and compute_api to edition 2024 (#10983) Updates `compute_tools` and `compute_api` crates to edition 2024. We like to stay on the latest edition if possible. There is no functional changes, however some code changes had to be done to accommodate the edition's breaking changes. The PR has three commits: * the first commit updates the named crates to edition 2024 and appeases `cargo clippy` by changing code. * the second commit performs a `cargo fmt` that does some minor changes (not many) * the third commit performs a cargo fmt with nightly options to reorder imports as a one-time thing. it's completely optional, but I offer it here for the compute team to review it. I'd like to hear opinions about the third commit, if it's wanted and felt worth the diff or not. I think most attention should be put onto the first commit. Part of #10918 --- compute_tools/Cargo.toml | 2 +- compute_tools/src/bin/compute_ctl.rs | 29 ++++++----- compute_tools/src/bin/fast_import.rs | 10 ++-- .../src/bin/fast_import/aws_s3_sync.rs | 3 +- compute_tools/src/bin/fast_import/s3_uri.rs | 3 +- compute_tools/src/catalog.rs | 18 +++---- compute_tools/src/checker.rs | 2 +- compute_tools/src/compute.rs | 48 +++++++++---------- compute_tools/src/config.rs | 5 +- compute_tools/src/configurator.rs | 3 +- compute_tools/src/extension_server.rs | 13 +++-- compute_tools/src/http/extract/json.rs | 3 +- compute_tools/src/http/extract/path.rs | 6 ++- compute_tools/src/http/extract/query.rs | 6 ++- compute_tools/src/http/mod.rs | 6 ++- .../src/http/routes/check_writability.rs | 7 ++- compute_tools/src/http/routes/configure.rs | 16 +++---- .../src/http/routes/database_schema.rs | 16 ++++--- .../src/http/routes/dbs_and_roles.rs | 7 ++- .../src/http/routes/extension_server.rs | 16 ++----- compute_tools/src/http/routes/extensions.rs | 16 +++---- compute_tools/src/http/routes/failpoints.rs | 3 +- compute_tools/src/http/routes/grants.rs | 16 +++---- compute_tools/src/http/routes/insights.rs | 6 ++- compute_tools/src/http/routes/metrics.rs | 8 ++-- compute_tools/src/http/routes/metrics_json.rs | 6 ++- compute_tools/src/http/routes/status.rs | 10 ++-- compute_tools/src/http/routes/terminate.rs | 12 ++--- compute_tools/src/http/server.rs | 27 +++++------ compute_tools/src/installed_extensions.rs | 2 +- compute_tools/src/lsn_lease.rs | 20 ++++---- compute_tools/src/metrics.rs | 2 +- compute_tools/src/monitor.rs | 7 +-- compute_tools/src/pg_helpers.rs | 5 +- compute_tools/src/spec.rs | 20 ++++---- compute_tools/src/spec_apply.rs | 15 +++--- compute_tools/src/swap.rs | 2 +- compute_tools/tests/config_test.rs | 2 +- libs/compute_api/Cargo.toml | 2 +- libs/compute_api/src/requests.rs | 9 ++-- libs/compute_api/src/responses.rs | 6 +-- libs/compute_api/src/spec.rs | 8 ++-- 42 files changed, 215 insertions(+), 208 deletions(-) diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index c276996df5..ba2c304141 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "compute_tools" version = "0.1.0" -edition.workspace = true +edition = "2024" license.workspace = true [features] diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 1cdae718fe..efe707cb7c 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -40,34 +40,33 @@ use std::path::Path; use std::process::exit; use std::str::FromStr; use std::sync::atomic::Ordering; -use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock}; -use std::{thread, time::Duration}; +use std::sync::{Arc, Condvar, Mutex, RwLock, mpsc}; +use std::thread; +use std::time::Duration; use anyhow::{Context, Result}; use chrono::Utc; use clap::Parser; -use compute_tools::disk_quota::set_disk_quota; -use compute_tools::http::server::Server; -use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; -use signal_hook::consts::{SIGQUIT, SIGTERM}; -use signal_hook::{consts::SIGINT, iterator::Signals}; -use tracing::{error, info, warn}; -use url::Url; - use compute_api::responses::{ComputeCtlConfig, ComputeStatus}; use compute_api::spec::ComputeSpec; - use compute_tools::compute::{ - forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, + ComputeNode, ComputeState, PG_PID, ParsedSpec, forward_termination_signal, }; use compute_tools::configurator::launch_configurator; +use compute_tools::disk_quota::set_disk_quota; use compute_tools::extension_server::get_pg_version_string; +use compute_tools::http::server::Server; use compute_tools::logger::*; +use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::spec::*; use compute_tools::swap::resize_swap; -use rlimit::{setrlimit, Resource}; +use rlimit::{Resource, setrlimit}; +use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM}; +use signal_hook::iterator::Signals; +use tracing::{error, info, warn}; +use url::Url; use utils::failpoint_support; // this is an arbitrary build tag. Fine as a default / for testing purposes @@ -149,6 +148,8 @@ struct Cli { fn main() -> Result<()> { let cli = Cli::parse(); + let scenario = failpoint_support::init(); + // For historical reasons, the main thread that processes the spec and launches postgres // is synchronous, but we always have this tokio runtime available and we "enter" it so // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...) @@ -160,8 +161,6 @@ fn main() -> Result<()> { let build_tag = runtime.block_on(init())?; - let scenario = failpoint_support::init(); - // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs index 585f3e4e1d..47558be7a0 100644 --- a/compute_tools/src/bin/fast_import.rs +++ b/compute_tools/src/bin/fast_import.rs @@ -25,13 +25,13 @@ //! docker push localhost:3030/localregistry/compute-node-v14:latest //! ``` -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use aws_config::BehaviorVersion; use camino::{Utf8Path, Utf8PathBuf}; use clap::{Parser, Subcommand}; -use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion}; +use compute_tools::extension_server::{PostgresMajorVersion, get_pg_version}; use nix::unistd::Pid; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{Instrument, error, info, info_span, warn}; use utils::fs_ext::is_directory_empty; #[path = "fast_import/aws_s3_sync.rs"] @@ -558,7 +558,9 @@ async fn cmd_dumprestore( decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext) .await? } else { - bail!("destination connection string must be provided in spec for dump_restore command"); + bail!( + "destination connection string must be provided in spec for dump_restore command" + ); }; (source, dest) diff --git a/compute_tools/src/bin/fast_import/aws_s3_sync.rs b/compute_tools/src/bin/fast_import/aws_s3_sync.rs index 1be10b36d6..d8d007da71 100644 --- a/compute_tools/src/bin/fast_import/aws_s3_sync.rs +++ b/compute_tools/src/bin/fast_import/aws_s3_sync.rs @@ -1,11 +1,10 @@ use camino::{Utf8Path, Utf8PathBuf}; use tokio::task::JoinSet; +use tracing::{info, warn}; use walkdir::WalkDir; use super::s3_uri::S3Uri; -use tracing::{info, warn}; - const MAX_PARALLEL_UPLOADS: usize = 10; /// Upload all files from 'local' to 'remote' diff --git a/compute_tools/src/bin/fast_import/s3_uri.rs b/compute_tools/src/bin/fast_import/s3_uri.rs index 52bbef420f..cf4dab7c02 100644 --- a/compute_tools/src/bin/fast_import/s3_uri.rs +++ b/compute_tools/src/bin/fast_import/s3_uri.rs @@ -1,6 +1,7 @@ -use anyhow::Result; use std::str::FromStr; +use anyhow::Result; + /// Struct to hold parsed S3 components #[derive(Debug, Clone, PartialEq, Eq)] pub struct S3Uri { diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 28b10ce21c..2a7f56e6fc 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -1,18 +1,20 @@ +use std::path::Path; +use std::process::Stdio; +use std::result::Result; +use std::sync::Arc; + +use compute_api::responses::CatalogObjects; use futures::Stream; use postgres::NoTls; -use std::{path::Path, process::Stdio, result::Result, sync::Arc}; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - process::Command, - spawn, -}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; +use tokio::spawn; use tokio_stream::{self as stream, StreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; use tracing::warn; use crate::compute::ComputeNode; use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgres_conf_for_db}; -use compute_api::responses::CatalogObjects; pub async fn get_dbs_and_roles(compute: &Arc) -> anyhow::Result { let conf = compute.get_tokio_conn_conf(Some("compute_ctl:get_dbs_and_roles")); @@ -55,7 +57,7 @@ pub enum SchemaDumpError { pub async fn get_database_schema( compute: &Arc, dbname: &str, -) -> Result>, SchemaDumpError> { +) -> Result> + use<>, SchemaDumpError> { let pgbin = &compute.pgbin; let basepath = Path::new(pgbin).parent().unwrap(); let pgdump = basepath.join("pg_dump"); diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs index 62d61a8bc9..e4207876ac 100644 --- a/compute_tools/src/checker.rs +++ b/compute_tools/src/checker.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Ok, Result}; +use anyhow::{Ok, Result, anyhow}; use tokio_postgres::NoTls; use tracing::{error, instrument, warn}; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 6d882ce997..e3c70ba622 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,42 +1,37 @@ use std::collections::{HashMap, HashSet}; -use std::env; -use std::fs; use std::iter::once; -use std::os::unix::fs::{symlink, PermissionsExt}; +use std::os::unix::fs::{PermissionsExt, symlink}; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; -use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Condvar, Mutex, RwLock}; -use std::time::Duration; -use std::time::Instant; +use std::time::{Duration, Instant}; +use std::{env, fs}; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use compute_api::spec::{Database, PgIdent, Role}; +use compute_api::privilege::Privilege; +use compute_api::responses::{ComputeMetrics, ComputeStatus}; +use compute_api::spec::{ + ComputeFeature, ComputeMode, ComputeSpec, Database, ExtVersion, PgIdent, Role, +}; +use futures::StreamExt; use futures::future::join_all; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use nix::sys::signal::{Signal, kill}; use nix::unistd::Pid; use postgres; -use postgres::error::SqlState; use postgres::NoTls; +use postgres::error::SqlState; +use remote_storage::{DownloadError, RemotePath}; +use tokio::spawn; use tracing::{debug, error, info, instrument, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; - -use compute_api::privilege::Privilege; -use compute_api::responses::{ComputeMetrics, ComputeStatus}; -use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion}; use utils::measured_stream::MeasuredReader; -use nix::sys::signal::{kill, Signal}; -use remote_storage::{DownloadError, RemotePath}; -use tokio::spawn; - use crate::installed_extensions::get_installed_extensions; -use crate::local_proxy; use crate::pg_helpers::*; use crate::spec::*; use crate::spec_apply::ApplySpecPhase::{ @@ -45,13 +40,12 @@ use crate::spec_apply::ApplySpecPhase::{ HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; -use crate::spec_apply::PerDatabasePhase; use crate::spec_apply::PerDatabasePhase::{ ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension, }; -use crate::spec_apply::{apply_operations, MutableApplyContext, DB}; +use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations}; use crate::sync_sk::{check_if_synced, ping_safekeeper}; -use crate::{config, extension_server}; +use crate::{config, extension_server, local_proxy}; pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0); pub static PG_PID: AtomicU32 = AtomicU32::new(0); @@ -1318,7 +1312,7 @@ impl ComputeNode { // Merge-apply spec & changes to PostgreSQL state. self.apply_spec_sql(spec.clone(), conf.clone(), max_concurrent_connections)?; - if let Some(ref local_proxy) = &spec.clone().local_proxy_config { + if let Some(local_proxy) = &spec.clone().local_proxy_config { info!("configuring local_proxy"); local_proxy::configure(local_proxy).context("apply_config local_proxy")?; } @@ -1538,7 +1532,9 @@ impl ComputeNode { &postgresql_conf_path, "neon.disable_logical_replication_subscribers=false", )? { - info!("updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"); + info!( + "updated postgresql.conf to set neon.disable_logical_replication_subscribers=false" + ); } self.pg_reload_conf()?; } @@ -1765,7 +1761,9 @@ LIMIT 100", info!("extension already downloaded, skipping re-download"); return Ok(0); } else if start_time_delta < HANG_TIMEOUT && !first_try { - info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout"); + info!( + "download {ext_archive_name} already started by another process, hanging untill completion or timeout" + ); let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500)); loop { info!("waiting for download"); diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index e1bdfffa54..e8056ec7eb 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -4,11 +4,10 @@ use std::io::prelude::*; use std::path::Path; use anyhow::Result; - -use crate::pg_helpers::escape_conf_value; -use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize}; use compute_api::spec::{ComputeMode, ComputeSpec, GenericOption}; +use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize, escape_conf_value}; + /// Check that `line` is inside a text file and put it there if it is not. /// Create file if it doesn't exist. pub fn line_in_file(path: &Path, line: &str) -> Result { diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index d88f26ca20..d97bd37285 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use std::thread; -use tracing::{error, info, instrument}; - use compute_api::responses::ComputeStatus; +use tracing::{error, info, instrument}; use crate::compute::ComputeNode; diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 00f46386e7..77e98359ab 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -71,15 +71,15 @@ More specifically, here is an example ext_index.json } } */ -use anyhow::Result; -use anyhow::{bail, Context}; +use std::path::Path; +use std::str; + +use anyhow::{Context, Result, bail}; use bytes::Bytes; use compute_api::spec::RemoteExtSpec; use regex::Regex; use remote_storage::*; use reqwest::StatusCode; -use std::path::Path; -use std::str; use tar::Archive; use tracing::info; use tracing::log::warn; @@ -244,7 +244,10 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) { info!("writing file {:?}{:?}", control_path, control_content); std::fs::write(control_path, control_content).unwrap(); } else { - warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path); + warn!( + "control file {:?} exists both locally and remotely. ignoring the remote version.", + control_path + ); } } } diff --git a/compute_tools/src/http/extract/json.rs b/compute_tools/src/http/extract/json.rs index 104cc25d5f..1d32e4ff37 100644 --- a/compute_tools/src/http/extract/json.rs +++ b/compute_tools/src/http/extract/json.rs @@ -1,6 +1,7 @@ use std::ops::{Deref, DerefMut}; -use axum::extract::{rejection::JsonRejection, FromRequest, Request}; +use axum::extract::rejection::JsonRejection; +use axum::extract::{FromRequest, Request}; use compute_api::responses::GenericAPIError; use http::StatusCode; diff --git a/compute_tools/src/http/extract/path.rs b/compute_tools/src/http/extract/path.rs index 09637a96a4..45970cff3d 100644 --- a/compute_tools/src/http/extract/path.rs +++ b/compute_tools/src/http/extract/path.rs @@ -1,8 +1,10 @@ use std::ops::{Deref, DerefMut}; -use axum::extract::{rejection::PathRejection, FromRequestParts}; +use axum::extract::FromRequestParts; +use axum::extract::rejection::PathRejection; use compute_api::responses::GenericAPIError; -use http::{request::Parts, StatusCode}; +use http::StatusCode; +use http::request::Parts; /// Custom `Path` extractor, so that we can format errors into /// `JsonResponse`. diff --git a/compute_tools/src/http/extract/query.rs b/compute_tools/src/http/extract/query.rs index 9dec3642cf..b8079ea770 100644 --- a/compute_tools/src/http/extract/query.rs +++ b/compute_tools/src/http/extract/query.rs @@ -1,8 +1,10 @@ use std::ops::{Deref, DerefMut}; -use axum::extract::{rejection::QueryRejection, FromRequestParts}; +use axum::extract::FromRequestParts; +use axum::extract::rejection::QueryRejection; use compute_api::responses::GenericAPIError; -use http::{request::Parts, StatusCode}; +use http::StatusCode; +use http::request::Parts; /// Custom `Query` extractor, so that we can format errors into /// `JsonResponse`. diff --git a/compute_tools/src/http/mod.rs b/compute_tools/src/http/mod.rs index 93eb6ef5b7..d182278174 100644 --- a/compute_tools/src/http/mod.rs +++ b/compute_tools/src/http/mod.rs @@ -1,6 +1,8 @@ -use axum::{body::Body, response::Response}; +use axum::body::Body; +use axum::response::Response; use compute_api::responses::{ComputeStatus, GenericAPIError}; -use http::{header::CONTENT_TYPE, StatusCode}; +use http::StatusCode; +use http::header::CONTENT_TYPE; use serde::Serialize; use tracing::error; diff --git a/compute_tools/src/http/routes/check_writability.rs b/compute_tools/src/http/routes/check_writability.rs index d7feb055e9..5a12686fa8 100644 --- a/compute_tools/src/http/routes/check_writability.rs +++ b/compute_tools/src/http/routes/check_writability.rs @@ -1,10 +1,13 @@ use std::sync::Arc; -use axum::{extract::State, response::Response}; +use axum::extract::State; +use axum::response::Response; use compute_api::responses::ComputeStatus; use http::StatusCode; -use crate::{checker::check_writability, compute::ComputeNode, http::JsonResponse}; +use crate::checker::check_writability; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; /// Check that the compute is currently running. pub(in crate::http) async fn is_writable(State(compute): State>) -> Response { diff --git a/compute_tools/src/http/routes/configure.rs b/compute_tools/src/http/routes/configure.rs index 2546cbc344..a2892196b7 100644 --- a/compute_tools/src/http/routes/configure.rs +++ b/compute_tools/src/http/routes/configure.rs @@ -1,18 +1,16 @@ use std::sync::Arc; -use axum::{extract::State, response::Response}; -use compute_api::{ - requests::ConfigurationRequest, - responses::{ComputeStatus, ComputeStatusResponse}, -}; +use axum::extract::State; +use axum::response::Response; +use compute_api::requests::ConfigurationRequest; +use compute_api::responses::{ComputeStatus, ComputeStatusResponse}; use http::StatusCode; use tokio::task; use tracing::info; -use crate::{ - compute::{ComputeNode, ParsedSpec}, - http::{extract::Json, JsonResponse}, -}; +use crate::compute::{ComputeNode, ParsedSpec}; +use crate::http::JsonResponse; +use crate::http::extract::Json; // Accept spec in JSON format and request compute configuration. If anything // goes wrong after we set the compute status to `ConfigurationPending` and diff --git a/compute_tools/src/http/routes/database_schema.rs b/compute_tools/src/http/routes/database_schema.rs index fd716272dc..1f6ca4b79d 100644 --- a/compute_tools/src/http/routes/database_schema.rs +++ b/compute_tools/src/http/routes/database_schema.rs @@ -1,14 +1,16 @@ use std::sync::Arc; -use axum::{body::Body, extract::State, response::Response}; -use http::{header::CONTENT_TYPE, StatusCode}; +use axum::body::Body; +use axum::extract::State; +use axum::response::Response; +use http::StatusCode; +use http::header::CONTENT_TYPE; use serde::Deserialize; -use crate::{ - catalog::{get_database_schema, SchemaDumpError}, - compute::ComputeNode, - http::{extract::Query, JsonResponse}, -}; +use crate::catalog::{SchemaDumpError, get_database_schema}; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; +use crate::http::extract::Query; #[derive(Debug, Clone, Deserialize)] pub(in crate::http) struct DatabaseSchemaParams { diff --git a/compute_tools/src/http/routes/dbs_and_roles.rs b/compute_tools/src/http/routes/dbs_and_roles.rs index 4843c3fab4..790fe0dfe3 100644 --- a/compute_tools/src/http/routes/dbs_and_roles.rs +++ b/compute_tools/src/http/routes/dbs_and_roles.rs @@ -1,9 +1,12 @@ use std::sync::Arc; -use axum::{extract::State, response::Response}; +use axum::extract::State; +use axum::response::Response; use http::StatusCode; -use crate::{catalog::get_dbs_and_roles, compute::ComputeNode, http::JsonResponse}; +use crate::catalog::get_dbs_and_roles; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; /// Get the databases and roles from the compute. pub(in crate::http) async fn get_catalog_objects( diff --git a/compute_tools/src/http/routes/extension_server.rs b/compute_tools/src/http/routes/extension_server.rs index 5cc9b6d277..b0265d1e99 100644 --- a/compute_tools/src/http/routes/extension_server.rs +++ b/compute_tools/src/http/routes/extension_server.rs @@ -1,19 +1,13 @@ use std::sync::Arc; -use axum::{ - extract::State, - response::{IntoResponse, Response}, -}; +use axum::extract::State; +use axum::response::{IntoResponse, Response}; use http::StatusCode; use serde::Deserialize; -use crate::{ - compute::ComputeNode, - http::{ - extract::{Path, Query}, - JsonResponse, - }, -}; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; +use crate::http::extract::{Path, Query}; #[derive(Debug, Clone, Deserialize)] pub(in crate::http) struct ExtensionServerParams { diff --git a/compute_tools/src/http/routes/extensions.rs b/compute_tools/src/http/routes/extensions.rs index 1fc03b9109..910e1fa155 100644 --- a/compute_tools/src/http/routes/extensions.rs +++ b/compute_tools/src/http/routes/extensions.rs @@ -1,16 +1,14 @@ use std::sync::Arc; -use axum::{extract::State, response::Response}; -use compute_api::{ - requests::ExtensionInstallRequest, - responses::{ComputeStatus, ExtensionInstallResponse}, -}; +use axum::extract::State; +use axum::response::Response; +use compute_api::requests::ExtensionInstallRequest; +use compute_api::responses::{ComputeStatus, ExtensionInstallResponse}; use http::StatusCode; -use crate::{ - compute::ComputeNode, - http::{extract::Json, JsonResponse}, -}; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; +use crate::http::extract::Json; /// Install a extension. pub(in crate::http) async fn install_extension( diff --git a/compute_tools/src/http/routes/failpoints.rs b/compute_tools/src/http/routes/failpoints.rs index 836417d784..8f5da99963 100644 --- a/compute_tools/src/http/routes/failpoints.rs +++ b/compute_tools/src/http/routes/failpoints.rs @@ -17,7 +17,8 @@ pub struct FailpointConfig { pub actions: String, } -use crate::http::{extract::Json, JsonResponse}; +use crate::http::JsonResponse; +use crate::http::extract::Json; /// Configure failpoints for testing purposes. pub(in crate::http) async fn configure_failpoints( diff --git a/compute_tools/src/http/routes/grants.rs b/compute_tools/src/http/routes/grants.rs index 3f67f011e5..267dcbb27e 100644 --- a/compute_tools/src/http/routes/grants.rs +++ b/compute_tools/src/http/routes/grants.rs @@ -1,16 +1,14 @@ use std::sync::Arc; -use axum::{extract::State, response::Response}; -use compute_api::{ - requests::SetRoleGrantsRequest, - responses::{ComputeStatus, SetRoleGrantsResponse}, -}; +use axum::extract::State; +use axum::response::Response; +use compute_api::requests::SetRoleGrantsRequest; +use compute_api::responses::{ComputeStatus, SetRoleGrantsResponse}; use http::StatusCode; -use crate::{ - compute::ComputeNode, - http::{extract::Json, JsonResponse}, -}; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; +use crate::http::extract::Json; /// Add grants for a role. pub(in crate::http) async fn add_grant( diff --git a/compute_tools/src/http/routes/insights.rs b/compute_tools/src/http/routes/insights.rs index 6b03a461c3..b1ba67161e 100644 --- a/compute_tools/src/http/routes/insights.rs +++ b/compute_tools/src/http/routes/insights.rs @@ -1,10 +1,12 @@ use std::sync::Arc; -use axum::{extract::State, response::Response}; +use axum::extract::State; +use axum::response::Response; use compute_api::responses::ComputeStatus; use http::StatusCode; -use crate::{compute::ComputeNode, http::JsonResponse}; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; /// Collect current Postgres usage insights. pub(in crate::http) async fn get_insights(State(compute): State>) -> Response { diff --git a/compute_tools/src/http/routes/metrics.rs b/compute_tools/src/http/routes/metrics.rs index 13150a7588..da8d8b20a5 100644 --- a/compute_tools/src/http/routes/metrics.rs +++ b/compute_tools/src/http/routes/metrics.rs @@ -1,10 +1,12 @@ -use axum::{body::Body, response::Response}; -use http::header::CONTENT_TYPE; +use axum::body::Body; +use axum::response::Response; use http::StatusCode; +use http::header::CONTENT_TYPE; use metrics::proto::MetricFamily; use metrics::{Encoder, TextEncoder}; -use crate::{http::JsonResponse, metrics::collect}; +use crate::http::JsonResponse; +use crate::metrics::collect; /// Expose Prometheus metrics. pub(in crate::http) async fn get_metrics() -> Response { diff --git a/compute_tools/src/http/routes/metrics_json.rs b/compute_tools/src/http/routes/metrics_json.rs index 0709db5011..bc35ee2645 100644 --- a/compute_tools/src/http/routes/metrics_json.rs +++ b/compute_tools/src/http/routes/metrics_json.rs @@ -1,9 +1,11 @@ use std::sync::Arc; -use axum::{extract::State, response::Response}; +use axum::extract::State; +use axum::response::Response; use http::StatusCode; -use crate::{compute::ComputeNode, http::JsonResponse}; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; /// Get startup metrics. pub(in crate::http) async fn get_metrics(State(compute): State>) -> Response { diff --git a/compute_tools/src/http/routes/status.rs b/compute_tools/src/http/routes/status.rs index d64d53a58f..8ed1299d6b 100644 --- a/compute_tools/src/http/routes/status.rs +++ b/compute_tools/src/http/routes/status.rs @@ -1,9 +1,13 @@ -use std::{ops::Deref, sync::Arc}; +use std::ops::Deref; +use std::sync::Arc; -use axum::{extract::State, http::StatusCode, response::Response}; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::Response; use compute_api::responses::ComputeStatusResponse; -use crate::{compute::ComputeNode, http::JsonResponse}; +use crate::compute::ComputeNode; +use crate::http::JsonResponse; /// Retrieve the state of the comute. pub(in crate::http) async fn get_status(State(compute): State>) -> Response { diff --git a/compute_tools/src/http/routes/terminate.rs b/compute_tools/src/http/routes/terminate.rs index 7acd84f236..2c24d4ad6b 100644 --- a/compute_tools/src/http/routes/terminate.rs +++ b/compute_tools/src/http/routes/terminate.rs @@ -1,18 +1,14 @@ use std::sync::Arc; -use axum::{ - extract::State, - response::{IntoResponse, Response}, -}; +use axum::extract::State; +use axum::response::{IntoResponse, Response}; use compute_api::responses::ComputeStatus; use http::StatusCode; use tokio::task; use tracing::info; -use crate::{ - compute::{forward_termination_signal, ComputeNode}, - http::JsonResponse, -}; +use crate::compute::{ComputeNode, forward_termination_signal}; +use crate::http::JsonResponse; /// Terminate the compute. pub(in crate::http) async fn terminate(State(compute): State>) -> Response { diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index a523ecd96f..efd18afc78 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -1,23 +1,20 @@ -use std::{ - fmt::Display, - net::{IpAddr, Ipv6Addr, SocketAddr}, - sync::Arc, - time::Duration, -}; +use std::fmt::Display; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use std::sync::Arc; +use std::time::Duration; use anyhow::Result; -use axum::{ - extract::Request, - middleware::{self, Next}, - response::{IntoResponse, Response}, - routing::{get, post}, - Router, -}; +use axum::Router; +use axum::extract::Request; +use axum::middleware::{self, Next}; +use axum::response::{IntoResponse, Response}; +use axum::routing::{get, post}; use http::StatusCode; use tokio::net::TcpListener; use tower::ServiceBuilder; -use tower_http::{request_id::PropagateRequestIdLayer, trace::TraceLayer}; -use tracing::{debug, error, info, Span}; +use tower_http::request_id::PropagateRequestIdLayer; +use tower_http::trace::TraceLayer; +use tracing::{Span, debug, error, info}; use uuid::Uuid; use super::routes::{ diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index 173dbf40b0..6921505466 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -1,7 +1,7 @@ -use compute_api::responses::{InstalledExtension, InstalledExtensions}; use std::collections::HashMap; use anyhow::Result; +use compute_api::responses::{InstalledExtension, InstalledExtensions}; use postgres::{Client, NoTls}; use crate::metrics::INSTALLED_EXTENSIONS; diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index 3061d387a5..b4ec675ff4 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -1,17 +1,15 @@ -use anyhow::bail; -use anyhow::Result; -use postgres::{NoTls, SimpleQueryMessage}; -use std::time::SystemTime; -use std::{str::FromStr, sync::Arc, thread, time::Duration}; -use utils::id::TenantId; -use utils::id::TimelineId; +use std::str::FromStr; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, SystemTime}; +use anyhow::{Result, bail}; use compute_api::spec::ComputeMode; +use postgres::{NoTls, SimpleQueryMessage}; use tracing::{info, warn}; -use utils::{ - lsn::Lsn, - shard::{ShardCount, ShardNumber, TenantShardId}, -}; +use utils::id::{TenantId, TimelineId}; +use utils::lsn::Lsn; +use utils::shard::{ShardCount, ShardNumber, TenantShardId}; use crate::compute::ComputeNode; diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index 870b294d08..bc96e5074c 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -1,6 +1,6 @@ use metrics::core::Collector; use metrics::proto::MetricFamily; -use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec}; +use metrics::{IntCounterVec, UIntGaugeVec, register_int_counter_vec, register_uint_gauge_vec}; use once_cell::sync::Lazy; pub(crate) static INSTALLED_EXTENSIONS: Lazy = Lazy::new(|| { diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 184f380a8d..248505e473 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -1,13 +1,14 @@ use std::sync::Arc; -use std::{thread, time::Duration}; +use std::thread; +use std::time::Duration; use chrono::{DateTime, Utc}; +use compute_api::responses::ComputeStatus; +use compute_api::spec::ComputeFeature; use postgres::{Client, NoTls}; use tracing::{debug, error, info, warn}; use crate::compute::ComputeNode; -use compute_api::responses::ComputeStatus; -use compute_api::spec::ComputeFeature; const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500); diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 86fcf99085..5a2e305e1d 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -9,7 +9,8 @@ use std::process::Child; use std::str::FromStr; use std::time::{Duration, Instant}; -use anyhow::{bail, Result}; +use anyhow::{Result, bail}; +use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role}; use futures::StreamExt; use ini::Ini; use notify::{RecursiveMode, Watcher}; @@ -21,8 +22,6 @@ use tokio_postgres; use tokio_postgres::NoTls; use tracing::{debug, error, info, instrument}; -use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role}; - const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds /// Escape a string for including it in a SQL literal. diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 9ca67aba44..1d19f2738d 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,20 +1,20 @@ -use anyhow::{anyhow, bail, Result}; -use reqwest::StatusCode; use std::fs::File; use std::path::Path; -use tokio_postgres::Client; -use tracing::{error, info, instrument, warn}; - -use crate::config; -use crate::metrics::{CPlaneRequestRPC, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS}; -use crate::migration::MigrationRunner; -use crate::params::PG_HBA_ALL_MD5; -use crate::pg_helpers::*; +use anyhow::{Result, anyhow, bail}; use compute_api::responses::{ ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse, }; use compute_api::spec::ComputeSpec; +use reqwest::StatusCode; +use tokio_postgres::Client; +use tracing::{error, info, instrument, warn}; + +use crate::config; +use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS}; +use crate::migration::MigrationRunner; +use crate::params::PG_HBA_ALL_MD5; +use crate::pg_helpers::*; // Do control plane request and return response if any. In case of error it // returns a bool flag indicating whether it makes sense to retry the request diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index c4416480d8..b4e084fd91 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -1,18 +1,18 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::future::Future; -use std::iter::empty; -use std::iter::once; +use std::iter::{empty, once}; use std::sync::Arc; -use crate::compute::construct_superuser_query; -use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt}; use anyhow::Result; use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role}; use futures::future::join_all; use tokio::sync::RwLock; use tokio_postgres::Client; -use tracing::{debug, info_span, warn, Instrument}; +use tracing::{Instrument, debug, info_span, warn}; + +use crate::compute::construct_superuser_query; +use crate::pg_helpers::{DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal}; #[derive(Clone)] pub enum DB { @@ -474,7 +474,10 @@ async fn get_operations<'a>( let edb = match databases.get(&db.name) { Some(edb) => edb, None => { - warn!("skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL", subphase, db.name); + warn!( + "skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL", + subphase, db.name + ); return Ok(Box::new(empty())); } }; diff --git a/compute_tools/src/swap.rs b/compute_tools/src/swap.rs index 7ba8cb5503..ed27a7cba4 100644 --- a/compute_tools/src/swap.rs +++ b/compute_tools/src/swap.rs @@ -1,6 +1,6 @@ use std::path::Path; -use anyhow::{anyhow, Context}; +use anyhow::{Context, anyhow}; use tracing::{instrument, warn}; pub const RESIZE_SWAP_BIN: &str = "/neonvm/bin/resize-swap"; diff --git a/compute_tools/tests/config_test.rs b/compute_tools/tests/config_test.rs index 9ab16b1930..7b2bff23d5 100644 --- a/compute_tools/tests/config_test.rs +++ b/compute_tools/tests/config_test.rs @@ -1,7 +1,7 @@ #[cfg(test)] mod config_tests { - use std::fs::{remove_file, File}; + use std::fs::{File, remove_file}; use std::io::{Read, Write}; use std::path::Path; diff --git a/libs/compute_api/Cargo.toml b/libs/compute_api/Cargo.toml index c11a1b6688..0d1618c1b2 100644 --- a/libs/compute_api/Cargo.toml +++ b/libs/compute_api/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "compute_api" version = "0.1.0" -edition.workspace = true +edition = "2024" license.workspace = true [dependencies] diff --git a/libs/compute_api/src/requests.rs b/libs/compute_api/src/requests.rs index 0c256cae2e..3fbdfcf83f 100644 --- a/libs/compute_api/src/requests.rs +++ b/libs/compute_api/src/requests.rs @@ -1,11 +1,10 @@ //! Structs representing the JSON formats used in the compute_ctl's HTTP API. -use crate::{ - privilege::Privilege, - responses::ComputeCtlConfig, - spec::{ComputeSpec, ExtVersion, PgIdent}, -}; use serde::{Deserialize, Serialize}; +use crate::privilege::Privilege; +use crate::responses::ComputeCtlConfig; +use crate::spec::{ComputeSpec, ExtVersion, PgIdent}; + /// Request of the /configure API /// /// We now pass only `spec` in the configuration request, but later we can diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index a6248019d9..35c580bd37 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -6,10 +6,8 @@ use chrono::{DateTime, Utc}; use jsonwebtoken::jwk::JwkSet; use serde::{Deserialize, Serialize, Serializer}; -use crate::{ - privilege::Privilege, - spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role}, -}; +use crate::privilege::Privilege; +use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role}; #[derive(Serialize, Debug, Deserialize)] pub struct GenericAPIError { diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 8fffae92fb..d02bfd6814 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -5,13 +5,12 @@ //! and connect it to the storage nodes. use std::collections::HashMap; +use regex::Regex; +use remote_storage::RemotePath; use serde::{Deserialize, Serialize}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; -use regex::Regex; -use remote_storage::RemotePath; - /// String type alias representing Postgres identifier and /// intended to be used for DB / role names. pub type PgIdent = String; @@ -339,9 +338,10 @@ pub struct JwksSettings { #[cfg(test)] mod tests { - use super::*; use std::fs::File; + use super::*; + #[test] fn allow_installing_remote_extensions() { let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({