Upgrade compute_tools and compute_api to edition 2024 (#10983)

Updates `compute_tools` and `compute_api` crates to edition 2024. We
like to stay on the latest edition if possible. There is no functional
changes, however some code changes had to be done to accommodate the
edition's breaking changes.

The PR has three commits:

* the first commit updates the named crates to edition 2024 and appeases
`cargo clippy` by changing code.
* the second commit performs a `cargo fmt` that does some minor changes
(not many)
* the third commit performs a cargo fmt with nightly options to reorder
imports as a one-time thing. it's completely optional, but I offer it
here for the compute team to review it.

I'd like to hear opinions about the third commit, if it's wanted and
felt worth the diff or not. I think most attention should be put onto
the first commit.

Part of #10918
This commit is contained in:
Arpad Müller
2025-02-26 14:12:26 +01:00
committed by GitHub
parent c2a768086d
commit f94286f0c9
42 changed files with 215 additions and 208 deletions

View File

@@ -1,7 +1,7 @@
[package]
name = "compute_tools"
version = "0.1.0"
edition.workspace = true
edition = "2024"
license.workspace = true
[features]

View File

@@ -40,34 +40,33 @@ use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::{thread, time::Duration};
use std::sync::{Arc, Condvar, Mutex, RwLock, mpsc};
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::http::server::Server;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
use url::Url;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
ComputeNode, ComputeState, PG_PID, ParsedSpec, forward_termination_signal,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::http::server::Server;
use compute_tools::logger::*;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals;
use tracing::{error, info, warn};
use url::Url;
use utils::failpoint_support;
// this is an arbitrary build tag. Fine as a default / for testing purposes
@@ -149,6 +148,8 @@ struct Cli {
fn main() -> Result<()> {
let cli = Cli::parse();
let scenario = failpoint_support::init();
// For historical reasons, the main thread that processes the spec and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
@@ -160,8 +161,6 @@ fn main() -> Result<()> {
let build_tag = runtime.block_on(init())?;
let scenario = failpoint_support::init();
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;

View File

@@ -25,13 +25,13 @@
//! docker push localhost:3030/localregistry/compute-node-v14:latest
//! ```
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use aws_config::BehaviorVersion;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
use compute_tools::extension_server::{PostgresMajorVersion, get_pg_version};
use nix::unistd::Pid;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{Instrument, error, info, info_span, warn};
use utils::fs_ext::is_directory_empty;
#[path = "fast_import/aws_s3_sync.rs"]
@@ -558,7 +558,9 @@ async fn cmd_dumprestore(
decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext)
.await?
} else {
bail!("destination connection string must be provided in spec for dump_restore command");
bail!(
"destination connection string must be provided in spec for dump_restore command"
);
};
(source, dest)

View File

@@ -1,11 +1,10 @@
use camino::{Utf8Path, Utf8PathBuf};
use tokio::task::JoinSet;
use tracing::{info, warn};
use walkdir::WalkDir;
use super::s3_uri::S3Uri;
use tracing::{info, warn};
const MAX_PARALLEL_UPLOADS: usize = 10;
/// Upload all files from 'local' to 'remote'

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use std::str::FromStr;
use anyhow::Result;
/// Struct to hold parsed S3 components
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct S3Uri {

View File

@@ -1,18 +1,20 @@
use std::path::Path;
use std::process::Stdio;
use std::result::Result;
use std::sync::Arc;
use compute_api::responses::CatalogObjects;
use futures::Stream;
use postgres::NoTls;
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
spawn,
};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::spawn;
use tokio_stream::{self as stream, StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::warn;
use crate::compute::ComputeNode;
use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgres_conf_for_db};
use compute_api::responses::CatalogObjects;
pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<CatalogObjects> {
let conf = compute.get_tokio_conn_conf(Some("compute_ctl:get_dbs_and_roles"));
@@ -55,7 +57,7 @@ pub enum SchemaDumpError {
pub async fn get_database_schema(
compute: &Arc<ComputeNode>,
dbname: &str,
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>>, SchemaDumpError> {
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>> + use<>, SchemaDumpError> {
let pgbin = &compute.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");

View File

@@ -1,4 +1,4 @@
use anyhow::{anyhow, Ok, Result};
use anyhow::{Ok, Result, anyhow};
use tokio_postgres::NoTls;
use tracing::{error, instrument, warn};

View File

@@ -1,42 +1,37 @@
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs;
use std::iter::once;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::Duration;
use std::time::Instant;
use std::time::{Duration, Instant};
use std::{env, fs};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::spec::{Database, PgIdent, Role};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{
ComputeFeature, ComputeMode, ComputeSpec, Database, ExtVersion, PgIdent, Role,
};
use futures::StreamExt;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use postgres;
use postgres::error::SqlState;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use tokio::spawn;
use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion};
use utils::measured_stream::MeasuredReader;
use nix::sys::signal::{kill, Signal};
use remote_storage::{DownloadError, RemotePath};
use tokio::spawn;
use crate::installed_extensions::get_installed_extensions;
use crate::local_proxy;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::spec_apply::ApplySpecPhase::{
@@ -45,13 +40,12 @@ use crate::spec_apply::ApplySpecPhase::{
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase;
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
use crate::spec_apply::{apply_operations, MutableApplyContext, DB};
use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations};
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::{config, extension_server};
use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
@@ -1318,7 +1312,7 @@ impl ComputeNode {
// Merge-apply spec & changes to PostgreSQL state.
self.apply_spec_sql(spec.clone(), conf.clone(), max_concurrent_connections)?;
if let Some(ref local_proxy) = &spec.clone().local_proxy_config {
if let Some(local_proxy) = &spec.clone().local_proxy_config {
info!("configuring local_proxy");
local_proxy::configure(local_proxy).context("apply_config local_proxy")?;
}
@@ -1538,7 +1532,9 @@ impl ComputeNode {
&postgresql_conf_path,
"neon.disable_logical_replication_subscribers=false",
)? {
info!("updated postgresql.conf to set neon.disable_logical_replication_subscribers=false");
info!(
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
);
}
self.pg_reload_conf()?;
}
@@ -1765,7 +1761,9 @@ LIMIT 100",
info!("extension already downloaded, skipping re-download");
return Ok(0);
} else if start_time_delta < HANG_TIMEOUT && !first_try {
info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
info!(
"download {ext_archive_name} already started by another process, hanging untill completion or timeout"
);
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
loop {
info!("waiting for download");

View File

@@ -4,11 +4,10 @@ use std::io::prelude::*;
use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::escape_conf_value;
use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize};
use compute_api::spec::{ComputeMode, ComputeSpec, GenericOption};
use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize, escape_conf_value};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {

View File

@@ -1,9 +1,8 @@
use std::sync::Arc;
use std::thread;
use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus;
use tracing::{error, info, instrument};
use crate::compute::ComputeNode;

View File

@@ -71,15 +71,15 @@ More specifically, here is an example ext_index.json
}
}
*/
use anyhow::Result;
use anyhow::{bail, Context};
use std::path::Path;
use std::str;
use anyhow::{Context, Result, bail};
use bytes::Bytes;
use compute_api::spec::RemoteExtSpec;
use regex::Regex;
use remote_storage::*;
use reqwest::StatusCode;
use std::path::Path;
use std::str;
use tar::Archive;
use tracing::info;
use tracing::log::warn;
@@ -244,7 +244,10 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
info!("writing file {:?}{:?}", control_path, control_content);
std::fs::write(control_path, control_content).unwrap();
} else {
warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
warn!(
"control file {:?} exists both locally and remotely. ignoring the remote version.",
control_path
);
}
}
}

View File

@@ -1,6 +1,7 @@
use std::ops::{Deref, DerefMut};
use axum::extract::{rejection::JsonRejection, FromRequest, Request};
use axum::extract::rejection::JsonRejection;
use axum::extract::{FromRequest, Request};
use compute_api::responses::GenericAPIError;
use http::StatusCode;

View File

@@ -1,8 +1,10 @@
use std::ops::{Deref, DerefMut};
use axum::extract::{rejection::PathRejection, FromRequestParts};
use axum::extract::FromRequestParts;
use axum::extract::rejection::PathRejection;
use compute_api::responses::GenericAPIError;
use http::{request::Parts, StatusCode};
use http::StatusCode;
use http::request::Parts;
/// Custom `Path` extractor, so that we can format errors into
/// `JsonResponse<GenericAPIError>`.

View File

@@ -1,8 +1,10 @@
use std::ops::{Deref, DerefMut};
use axum::extract::{rejection::QueryRejection, FromRequestParts};
use axum::extract::FromRequestParts;
use axum::extract::rejection::QueryRejection;
use compute_api::responses::GenericAPIError;
use http::{request::Parts, StatusCode};
use http::StatusCode;
use http::request::Parts;
/// Custom `Query` extractor, so that we can format errors into
/// `JsonResponse<GenericAPIError>`.

View File

@@ -1,6 +1,8 @@
use axum::{body::Body, response::Response};
use axum::body::Body;
use axum::response::Response;
use compute_api::responses::{ComputeStatus, GenericAPIError};
use http::{header::CONTENT_TYPE, StatusCode};
use http::StatusCode;
use http::header::CONTENT_TYPE;
use serde::Serialize;
use tracing::error;

View File

@@ -1,10 +1,13 @@
use std::sync::Arc;
use axum::{extract::State, response::Response};
use axum::extract::State;
use axum::response::Response;
use compute_api::responses::ComputeStatus;
use http::StatusCode;
use crate::{checker::check_writability, compute::ComputeNode, http::JsonResponse};
use crate::checker::check_writability;
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
/// Check that the compute is currently running.
pub(in crate::http) async fn is_writable(State(compute): State<Arc<ComputeNode>>) -> Response {

View File

@@ -1,18 +1,16 @@
use std::sync::Arc;
use axum::{extract::State, response::Response};
use compute_api::{
requests::ConfigurationRequest,
responses::{ComputeStatus, ComputeStatusResponse},
};
use axum::extract::State;
use axum::response::Response;
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use http::StatusCode;
use tokio::task;
use tracing::info;
use crate::{
compute::{ComputeNode, ParsedSpec},
http::{extract::Json, JsonResponse},
};
use crate::compute::{ComputeNode, ParsedSpec};
use crate::http::JsonResponse;
use crate::http::extract::Json;
// Accept spec in JSON format and request compute configuration. If anything
// goes wrong after we set the compute status to `ConfigurationPending` and

View File

@@ -1,14 +1,16 @@
use std::sync::Arc;
use axum::{body::Body, extract::State, response::Response};
use http::{header::CONTENT_TYPE, StatusCode};
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use serde::Deserialize;
use crate::{
catalog::{get_database_schema, SchemaDumpError},
compute::ComputeNode,
http::{extract::Query, JsonResponse},
};
use crate::catalog::{SchemaDumpError, get_database_schema};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::http::extract::Query;
#[derive(Debug, Clone, Deserialize)]
pub(in crate::http) struct DatabaseSchemaParams {

View File

@@ -1,9 +1,12 @@
use std::sync::Arc;
use axum::{extract::State, response::Response};
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use crate::{catalog::get_dbs_and_roles, compute::ComputeNode, http::JsonResponse};
use crate::catalog::get_dbs_and_roles;
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
/// Get the databases and roles from the compute.
pub(in crate::http) async fn get_catalog_objects(

View File

@@ -1,19 +1,13 @@
use std::sync::Arc;
use axum::{
extract::State,
response::{IntoResponse, Response},
};
use axum::extract::State;
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use serde::Deserialize;
use crate::{
compute::ComputeNode,
http::{
extract::{Path, Query},
JsonResponse,
},
};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::http::extract::{Path, Query};
#[derive(Debug, Clone, Deserialize)]
pub(in crate::http) struct ExtensionServerParams {

View File

@@ -1,16 +1,14 @@
use std::sync::Arc;
use axum::{extract::State, response::Response};
use compute_api::{
requests::ExtensionInstallRequest,
responses::{ComputeStatus, ExtensionInstallResponse},
};
use axum::extract::State;
use axum::response::Response;
use compute_api::requests::ExtensionInstallRequest;
use compute_api::responses::{ComputeStatus, ExtensionInstallResponse};
use http::StatusCode;
use crate::{
compute::ComputeNode,
http::{extract::Json, JsonResponse},
};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::http::extract::Json;
/// Install a extension.
pub(in crate::http) async fn install_extension(

View File

@@ -17,7 +17,8 @@ pub struct FailpointConfig {
pub actions: String,
}
use crate::http::{extract::Json, JsonResponse};
use crate::http::JsonResponse;
use crate::http::extract::Json;
/// Configure failpoints for testing purposes.
pub(in crate::http) async fn configure_failpoints(

View File

@@ -1,16 +1,14 @@
use std::sync::Arc;
use axum::{extract::State, response::Response};
use compute_api::{
requests::SetRoleGrantsRequest,
responses::{ComputeStatus, SetRoleGrantsResponse},
};
use axum::extract::State;
use axum::response::Response;
use compute_api::requests::SetRoleGrantsRequest;
use compute_api::responses::{ComputeStatus, SetRoleGrantsResponse};
use http::StatusCode;
use crate::{
compute::ComputeNode,
http::{extract::Json, JsonResponse},
};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::http::extract::Json;
/// Add grants for a role.
pub(in crate::http) async fn add_grant(

View File

@@ -1,10 +1,12 @@
use std::sync::Arc;
use axum::{extract::State, response::Response};
use axum::extract::State;
use axum::response::Response;
use compute_api::responses::ComputeStatus;
use http::StatusCode;
use crate::{compute::ComputeNode, http::JsonResponse};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
/// Collect current Postgres usage insights.
pub(in crate::http) async fn get_insights(State(compute): State<Arc<ComputeNode>>) -> Response {

View File

@@ -1,10 +1,12 @@
use axum::{body::Body, response::Response};
use http::header::CONTENT_TYPE;
use axum::body::Body;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use crate::{http::JsonResponse, metrics::collect};
use crate::http::JsonResponse;
use crate::metrics::collect;
/// Expose Prometheus metrics.
pub(in crate::http) async fn get_metrics() -> Response {

View File

@@ -1,9 +1,11 @@
use std::sync::Arc;
use axum::{extract::State, response::Response};
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use crate::{compute::ComputeNode, http::JsonResponse};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
/// Get startup metrics.
pub(in crate::http) async fn get_metrics(State(compute): State<Arc<ComputeNode>>) -> Response {

View File

@@ -1,9 +1,13 @@
use std::{ops::Deref, sync::Arc};
use std::ops::Deref;
use std::sync::Arc;
use axum::{extract::State, http::StatusCode, response::Response};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::Response;
use compute_api::responses::ComputeStatusResponse;
use crate::{compute::ComputeNode, http::JsonResponse};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
/// Retrieve the state of the comute.
pub(in crate::http) async fn get_status(State(compute): State<Arc<ComputeNode>>) -> Response {

View File

@@ -1,18 +1,14 @@
use std::sync::Arc;
use axum::{
extract::State,
response::{IntoResponse, Response},
};
use axum::extract::State;
use axum::response::{IntoResponse, Response};
use compute_api::responses::ComputeStatus;
use http::StatusCode;
use tokio::task;
use tracing::info;
use crate::{
compute::{forward_termination_signal, ComputeNode},
http::JsonResponse,
};
use crate::compute::{ComputeNode, forward_termination_signal};
use crate::http::JsonResponse;
/// Terminate the compute.
pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>) -> Response {

View File

@@ -1,23 +1,20 @@
use std::{
fmt::Display,
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use std::fmt::Display;
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use axum::{
extract::Request,
middleware::{self, Next},
response::{IntoResponse, Response},
routing::{get, post},
Router,
};
use axum::Router;
use axum::extract::Request;
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use http::StatusCode;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::{request_id::PropagateRequestIdLayer, trace::TraceLayer};
use tracing::{debug, error, info, Span};
use tower_http::request_id::PropagateRequestIdLayer;
use tower_http::trace::TraceLayer;
use tracing::{Span, debug, error, info};
use uuid::Uuid;
use super::routes::{

View File

@@ -1,7 +1,7 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use std::collections::HashMap;
use anyhow::Result;
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use postgres::{Client, NoTls};
use crate::metrics::INSTALLED_EXTENSIONS;

View File

@@ -1,17 +1,15 @@
use anyhow::bail;
use anyhow::Result;
use postgres::{NoTls, SimpleQueryMessage};
use std::time::SystemTime;
use std::{str::FromStr, sync::Arc, thread, time::Duration};
use utils::id::TenantId;
use utils::id::TimelineId;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::ComputeMode;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
use utils::{
lsn::Lsn,
shard::{ShardCount, ShardNumber, TenantShardId},
};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use crate::compute::ComputeNode;

View File

@@ -1,6 +1,6 @@
use metrics::core::Collector;
use metrics::proto::MetricFamily;
use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec};
use metrics::{IntCounterVec, UIntGaugeVec, register_int_counter_vec, register_uint_gauge_vec};
use once_cell::sync::Lazy;
pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {

View File

@@ -1,13 +1,14 @@
use std::sync::Arc;
use std::{thread, time::Duration};
use std::thread;
use std::time::Duration;
use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
use postgres::{Client, NoTls};
use tracing::{debug, error, info, warn};
use crate::compute::ComputeNode;
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);

View File

@@ -9,7 +9,8 @@ use std::process::Child;
use std::str::FromStr;
use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use anyhow::{Result, bail};
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
use futures::StreamExt;
use ini::Ini;
use notify::{RecursiveMode, Watcher};
@@ -21,8 +22,6 @@ use tokio_postgres;
use tokio_postgres::NoTls;
use tracing::{debug, error, info, instrument};
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Escape a string for including it in a SQL literal.

View File

@@ -1,20 +1,20 @@
use anyhow::{anyhow, bail, Result};
use reqwest::StatusCode;
use std::fs::File;
use std::path::Path;
use tokio_postgres::Client;
use tracing::{error, info, instrument, warn};
use crate::config;
use crate::metrics::{CPlaneRequestRPC, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use anyhow::{Result, anyhow, bail};
use compute_api::responses::{
ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
};
use compute_api::spec::ComputeSpec;
use reqwest::StatusCode;
use tokio_postgres::Client;
use tracing::{error, info, instrument, warn};
use crate::config;
use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
// Do control plane request and return response if any. In case of error it
// returns a bool flag indicating whether it makes sense to retry the request

View File

@@ -1,18 +1,18 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::iter::empty;
use std::iter::once;
use std::iter::{empty, once};
use std::sync::Arc;
use crate::compute::construct_superuser_query;
use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
use anyhow::Result;
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use tracing::{debug, info_span, warn, Instrument};
use tracing::{Instrument, debug, info_span, warn};
use crate::compute::construct_superuser_query;
use crate::pg_helpers::{DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal};
#[derive(Clone)]
pub enum DB {
@@ -474,7 +474,10 @@ async fn get_operations<'a>(
let edb = match databases.get(&db.name) {
Some(edb) => edb,
None => {
warn!("skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL", subphase, db.name);
warn!(
"skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL",
subphase, db.name
);
return Ok(Box::new(empty()));
}
};

View File

@@ -1,6 +1,6 @@
use std::path::Path;
use anyhow::{anyhow, Context};
use anyhow::{Context, anyhow};
use tracing::{instrument, warn};
pub const RESIZE_SWAP_BIN: &str = "/neonvm/bin/resize-swap";

View File

@@ -1,7 +1,7 @@
#[cfg(test)]
mod config_tests {
use std::fs::{remove_file, File};
use std::fs::{File, remove_file};
use std::io::{Read, Write};
use std::path::Path;

View File

@@ -1,7 +1,7 @@
[package]
name = "compute_api"
version = "0.1.0"
edition.workspace = true
edition = "2024"
license.workspace = true
[dependencies]

View File

@@ -1,11 +1,10 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use crate::{
privilege::Privilege,
responses::ComputeCtlConfig,
spec::{ComputeSpec, ExtVersion, PgIdent},
};
use serde::{Deserialize, Serialize};
use crate::privilege::Privilege;
use crate::responses::ComputeCtlConfig;
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
/// Request of the /configure API
///
/// We now pass only `spec` in the configuration request, but later we can

View File

@@ -6,10 +6,8 @@ use chrono::{DateTime, Utc};
use jsonwebtoken::jwk::JwkSet;
use serde::{Deserialize, Serialize, Serializer};
use crate::{
privilege::Privilege,
spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role},
};
use crate::privilege::Privilege;
use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role};
#[derive(Serialize, Debug, Deserialize)]
pub struct GenericAPIError {

View File

@@ -5,13 +5,12 @@
//! and connect it to the storage nodes.
use std::collections::HashMap;
use regex::Regex;
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use regex::Regex;
use remote_storage::RemotePath;
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
@@ -339,9 +338,10 @@ pub struct JwksSettings {
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use super::*;
#[test]
fn allow_installing_remote_extensions() {
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({