diff --git a/.github/workflows/_push-to-acr.yml b/.github/workflows/_push-to-acr.yml index 7b6eba2c06..c304172ff7 100644 --- a/.github/workflows/_push-to-acr.yml +++ b/.github/workflows/_push-to-acr.yml @@ -26,15 +26,9 @@ on: description: Azure tenant ID required: true type: string - skip_if: - description: Skip the job if this expression is true - required: true - type: boolean jobs: push-to-acr: - if: ${{ !inputs.skip_if }} - runs-on: ubuntu-22.04 permissions: contents: read # This is required for actions/checkout diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index d46b8dc1f5..a210c962cb 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -849,6 +849,7 @@ jobs: done push-to-acr-dev: + if: github.ref_name == 'main' needs: [ tag, promote-images ] uses: ./.github/workflows/_push-to-acr.yml with: @@ -858,9 +859,9 @@ jobs: registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }} subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }} tenant_id: ${{ vars.AZURE_TENANT_ID }} - skip_if: ${{ github.ref_name != 'main' }} push-to-acr-prod: + if: github.ref_name == 'release'|| github.ref_name == 'release-proxy' needs: [ tag, promote-images ] uses: ./.github/workflows/_push-to-acr.yml with: @@ -870,7 +871,6 @@ jobs: registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }} subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }} tenant_id: ${{ vars.AZURE_TENANT_ID }} - skip_if: ${{ !startsWith(github.ref_name, 'release') }} trigger-custom-extensions-build-and-wait: needs: [ check-permissions, tag ] @@ -948,7 +948,8 @@ jobs: deploy: needs: [ check-permissions, promote-images, tag, build-and-test-locally, trigger-custom-extensions-build-and-wait, push-to-acr-dev, push-to-acr-prod ] - if: github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' + # `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `push-to-acr-dev` and `push-to-acr-prod` + if: (github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy') && !failure() && !cancelled() runs-on: [ self-hosted, small ] container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest @@ -1046,7 +1047,8 @@ jobs: # The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory promote-compatibility-data: needs: [ deploy ] - if: github.ref_name == 'release' + # `!failure() && !cancelled()` is required because the workflow transitively depends on the job that can be skipped: `push-to-acr-dev` and `push-to-acr-prod` + if: github.ref_name == 'release' && !failure() && !cancelled() runs-on: ubuntu-22.04 steps: diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 619c5bce3e..94a072e394 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -151,7 +151,7 @@ where print!("."); io::stdout().flush().unwrap(); } - thread::sleep(RETRY_INTERVAL); + tokio::time::sleep(RETRY_INTERVAL).await; } Err(e) => { println!("error starting process {process_name:?}: {e:#}"); diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 1c94c42865..92f609761a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -34,12 +34,14 @@ use safekeeper_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, }; +use std::borrow::Cow; use std::collections::{BTreeSet, HashMap}; use std::path::PathBuf; use std::process::exit; use std::str::FromStr; use std::time::Duration; use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR; +use tokio::task::JoinSet; use url::Host; use utils::{ auth::{Claims, Scope}, @@ -87,35 +89,35 @@ fn main() -> Result<()> { // Check for 'neon init' command first. let subcommand_result = if sub_name == "init" { - handle_init(sub_args).map(Some) + handle_init(sub_args).map(|env| Some(Cow::Owned(env))) } else { // all other commands need an existing config - let mut env = - LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?; - let original_env = env.clone(); + let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?; + let original_env = env.clone(); + let env = Box::leak(Box::new(env)); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let subcommand_result = match sub_name { - "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), - "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), - "start" => rt.block_on(handle_start_all(&env, get_start_timeout(sub_args))), - "stop" => rt.block_on(handle_stop_all(sub_args, &env)), - "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), - "storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)), - "storage_broker" => rt.block_on(handle_storage_broker(sub_args, &env)), - "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), - "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)), - "mappings" => handle_mappings(sub_args, &mut env), + "tenant" => rt.block_on(handle_tenant(sub_args, env)), + "timeline" => rt.block_on(handle_timeline(sub_args, env)), + "start" => rt.block_on(handle_start_all(env, get_start_timeout(sub_args))), + "stop" => rt.block_on(handle_stop_all(sub_args, env)), + "pageserver" => rt.block_on(handle_pageserver(sub_args, env)), + "storage_controller" => rt.block_on(handle_storage_controller(sub_args, env)), + "storage_broker" => rt.block_on(handle_storage_broker(sub_args, env)), + "safekeeper" => rt.block_on(handle_safekeeper(sub_args, env)), + "endpoint" => rt.block_on(handle_endpoint(sub_args, env)), + "mappings" => handle_mappings(sub_args, env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), _ => bail!("unexpected subcommand {sub_name}"), }; - if original_env != env { - subcommand_result.map(|()| Some(env)) + if &original_env != env { + subcommand_result.map(|()| Some(Cow::Borrowed(env))) } else { subcommand_result.map(|()| None) } @@ -1273,48 +1275,95 @@ async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv } async fn handle_start_all( - env: &local_env::LocalEnv, + env: &'static local_env::LocalEnv, retry_timeout: &Duration, ) -> anyhow::Result<()> { + let Err(errors) = handle_start_all_impl(env, *retry_timeout).await else { + neon_start_status_check(env, retry_timeout) + .await + .context("status check after successful startup of all services")?; + return Ok(()); + }; + + eprintln!("startup failed because one or more services could not be started"); + + for e in errors { + eprintln!("{e}"); + let debug_repr = format!("{e:?}"); + for line in debug_repr.lines() { + eprintln!(" {line}"); + } + } + + try_stop_all(env, true).await; + + exit(2); +} + +/// Returns Ok() if and only if all services could be started successfully. +/// Otherwise, returns the list of errors that occurred during startup. +async fn handle_start_all_impl( + env: &'static local_env::LocalEnv, + retry_timeout: Duration, +) -> Result<(), Vec> { // Endpoints are not started automatically - broker::start_broker_process(env, retry_timeout).await?; + let mut js = JoinSet::new(); - // Only start the storage controller if the pageserver is configured to need it - if env.control_plane_api.is_some() { - let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller - .start(NeonStorageControllerStartArgs::with_default_instance_id( - (*retry_timeout).into(), - )) - .await - { - eprintln!("storage_controller start failed: {:#}", e); - try_stop_all(env, true).await; - exit(1); + // force infalliblity through closure + #[allow(clippy::redundant_closure_call)] + (|| { + js.spawn(async move { + let retry_timeout = retry_timeout; + broker::start_broker_process(env, &retry_timeout).await + }); + + // Only start the storage controller if the pageserver is configured to need it + if env.control_plane_api.is_some() { + js.spawn(async move { + let storage_controller = StorageController::from_env(env); + storage_controller + .start(NeonStorageControllerStartArgs::with_default_instance_id( + retry_timeout.into(), + )) + .await + .map_err(|e| e.context("start storage_controller")) + }); + } + + for ps_conf in &env.pageservers { + js.spawn(async move { + let pageserver = PageServerNode::from_env(env, ps_conf); + pageserver + .start(&retry_timeout) + .await + .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id))) + }); + } + + for node in env.safekeepers.iter() { + js.spawn(async move { + let safekeeper = SafekeeperNode::from_env(env, node); + safekeeper + .start(vec![], &retry_timeout) + .await + .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id))) + }); + } + })(); + + let mut errors = Vec::new(); + while let Some(result) = js.join_next().await { + let result = result.expect("we don't panic or cancel the tasks"); + if let Err(e) = result { + errors.push(e); } } - for ps_conf in &env.pageservers { - let pageserver = PageServerNode::from_env(env, ps_conf); - if let Err(e) = pageserver.start(retry_timeout).await { - eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); - try_stop_all(env, true).await; - exit(1); - } + if !errors.is_empty() { + return Err(errors); } - for node in env.safekeepers.iter() { - let safekeeper = SafekeeperNode::from_env(env, node); - if let Err(e) = safekeeper.start(vec![], retry_timeout).await { - eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e); - try_stop_all(env, false).await; - exit(1); - } - } - - neon_start_status_check(env, retry_timeout).await?; - Ok(()) } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 9f879c4b08..7554a03a68 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -702,7 +702,7 @@ impl Endpoint { } } } - std::thread::sleep(ATTEMPT_INTERVAL); + tokio::time::sleep(ATTEMPT_INTERVAL).await; } // disarm the scopeguard, let the child outlive this function (and neon_local invoction) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 33ca70af96..cae9416af6 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -17,9 +17,7 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; -use pageserver_api::models::{ - self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo, -}; +use pageserver_api::models::{self, AuxFilePolicy, TenantInfo, TimelineInfo}; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; @@ -324,22 +322,6 @@ impl PageServerNode { background_process::stop_process(immediate, "pageserver", &self.pid_file()) } - pub async fn page_server_psql_client( - &self, - ) -> anyhow::Result<( - tokio_postgres::Client, - tokio_postgres::Connection, - )> { - let mut config = self.pg_connection_config.clone(); - if self.conf.pg_auth_type == AuthType::NeonJWT { - let token = self - .env - .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; - config = config.set_password(Some(token)); - } - Ok(config.connect_no_tls().await?) - } - pub async fn check_status(&self) -> mgmt_api::Result<()> { self.http_client.status().await } @@ -540,19 +522,6 @@ impl PageServerNode { Ok(()) } - pub async fn location_config( - &self, - tenant_shard_id: TenantShardId, - config: LocationConfig, - flush_ms: Option, - lazy: bool, - ) -> anyhow::Result<()> { - Ok(self - .http_client - .location_config(tenant_shard_id, config, flush_ms, lazy) - .await?) - } - pub async fn timeline_list( &self, tenant_shard_id: &TenantShardId, @@ -636,14 +605,4 @@ impl PageServerNode { Ok(()) } - - pub async fn tenant_synthetic_size( - &self, - tenant_shard_id: TenantShardId, - ) -> anyhow::Result { - Ok(self - .http_client - .tenant_synthetic_size(tenant_shard_id) - .await?) - } } diff --git a/control_plane/src/postgresql_conf.rs b/control_plane/src/postgresql_conf.rs index 638575eb82..5aee12dc97 100644 --- a/control_plane/src/postgresql_conf.rs +++ b/control_plane/src/postgresql_conf.rs @@ -4,13 +4,10 @@ /// NOTE: This doesn't implement the full, correct postgresql.conf syntax. Just /// enough to extract a few settings we need in Neon, assuming you don't do /// funny stuff like include-directives or funny escaping. -use anyhow::{bail, Context, Result}; use once_cell::sync::Lazy; use regex::Regex; use std::collections::HashMap; use std::fmt; -use std::io::BufRead; -use std::str::FromStr; /// In-memory representation of a postgresql.conf file #[derive(Default, Debug)] @@ -19,84 +16,16 @@ pub struct PostgresConf { hash: HashMap, } -static CONF_LINE_RE: Lazy = Lazy::new(|| Regex::new(r"^((?:\w|\.)+)\s*=\s*(\S+)$").unwrap()); - impl PostgresConf { pub fn new() -> PostgresConf { PostgresConf::default() } - /// Read file into memory - pub fn read(read: impl std::io::Read) -> Result { - let mut result = Self::new(); - - for line in std::io::BufReader::new(read).lines() { - let line = line?; - - // Store each line in a vector, in original format - result.lines.push(line.clone()); - - // Also parse each line and insert key=value lines into a hash map. - // - // FIXME: This doesn't match exactly the flex/bison grammar in PostgreSQL. - // But it's close enough for our usage. - let line = line.trim(); - if line.starts_with('#') { - // comment, ignore - continue; - } else if let Some(caps) = CONF_LINE_RE.captures(line) { - let name = caps.get(1).unwrap().as_str(); - let raw_val = caps.get(2).unwrap().as_str(); - - if let Ok(val) = deescape_str(raw_val) { - // Note: if there's already an entry in the hash map for - // this key, this will replace it. That's the behavior what - // we want; when PostgreSQL reads the file, each line - // overrides any previous value for the same setting. - result.hash.insert(name.to_string(), val.to_string()); - } - } - } - Ok(result) - } - /// Return the current value of 'option' pub fn get(&self, option: &str) -> Option<&str> { self.hash.get(option).map(|x| x.as_ref()) } - /// Return the current value of a field, parsed to the right datatype. - /// - /// This calls the FromStr::parse() function on the value of the field. If - /// the field does not exist, or parsing fails, returns an error. - /// - pub fn parse_field(&self, field_name: &str, context: &str) -> Result - where - T: FromStr, - ::Err: std::error::Error + Send + Sync + 'static, - { - self.get(field_name) - .with_context(|| format!("could not find '{}' option {}", field_name, context))? - .parse::() - .with_context(|| format!("could not parse '{}' option {}", field_name, context)) - } - - pub fn parse_field_optional(&self, field_name: &str, context: &str) -> Result> - where - T: FromStr, - ::Err: std::error::Error + Send + Sync + 'static, - { - if let Some(val) = self.get(field_name) { - let result = val - .parse::() - .with_context(|| format!("could not parse '{}' option {}", field_name, context))?; - - Ok(Some(result)) - } else { - Ok(None) - } - } - /// /// Note: if you call this multiple times for the same option, the config /// file will a line for each call. It would be nice to have a function @@ -154,48 +83,8 @@ fn escape_str(s: &str) -> String { } } -/// De-escape a possibly-quoted value. -/// -/// See `DeescapeQuotedString` function in PostgreSQL sources for how PostgreSQL -/// does this. -fn deescape_str(s: &str) -> Result { - // If the string has a quote at the beginning and end, strip them out. - if s.len() >= 2 && s.starts_with('\'') && s.ends_with('\'') { - let mut result = String::new(); - - let mut iter = s[1..(s.len() - 1)].chars().peekable(); - while let Some(c) = iter.next() { - let newc = if c == '\\' { - match iter.next() { - Some('b') => '\x08', - Some('f') => '\x0c', - Some('n') => '\n', - Some('r') => '\r', - Some('t') => '\t', - Some('0'..='7') => { - // TODO - bail!("octal escapes not supported"); - } - Some(n) => n, - None => break, - } - } else if c == '\'' && iter.peek() == Some(&'\'') { - // doubled quote becomes just one quote - iter.next().unwrap() - } else { - c - }; - - result.push(newc); - } - Ok(result) - } else { - Ok(s.to_string()) - } -} - #[test] -fn test_postgresql_conf_escapes() -> Result<()> { +fn test_postgresql_conf_escapes() -> anyhow::Result<()> { assert_eq!(escape_str("foo bar"), "'foo bar'"); // these don't need to be quoted assert_eq!(escape_str("foo"), "foo"); @@ -214,13 +103,5 @@ fn test_postgresql_conf_escapes() -> Result<()> { assert_eq!(escape_str("fo\\o"), "'fo\\\\o'"); assert_eq!(escape_str("10 cats"), "'10 cats'"); - // Test de-escaping - assert_eq!(deescape_str(&escape_str("foo"))?, "foo"); - assert_eq!(deescape_str(&escape_str("fo'o\nba\\r"))?, "fo'o\nba\\r"); - assert_eq!(deescape_str("'\\b\\f\\n\\r\\t'")?, "\x08\x0c\n\r\t"); - - // octal-escapes are currently not supported - assert!(deescape_str("'foo\\7\\07\\007'").is_err()); - Ok(()) } diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs index 810196aff6..fbe2e6830f 100644 --- a/libs/consumption_metrics/src/lib.rs +++ b/libs/consumption_metrics/src/lib.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc}; use rand::Rng; use serde::{Deserialize, Serialize}; -#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] #[serde(tag = "type")] pub enum EventType { #[serde(rename = "absolute")] @@ -107,7 +107,7 @@ pub const CHUNK_SIZE: usize = 1000; // Just a wrapper around a slice of events // to serialize it as `{"events" : [ ] } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(serde::Serialize, Deserialize)] pub struct EventChunk<'a, T: Clone> { pub events: std::borrow::Cow<'a, [T]>, } diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 1194ee93ef..61e32bc9ab 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -173,40 +173,6 @@ impl Default for EvictionOrder { } } -#[derive( - Eq, - PartialEq, - Debug, - Copy, - Clone, - strum_macros::EnumString, - strum_macros::Display, - serde_with::DeserializeFromStr, - serde_with::SerializeDisplay, -)] -#[strum(serialize_all = "kebab-case")] -pub enum GetVectoredImpl { - Sequential, - Vectored, -} - -#[derive( - Eq, - PartialEq, - Debug, - Copy, - Clone, - strum_macros::EnumString, - strum_macros::Display, - serde_with::DeserializeFromStr, - serde_with::SerializeDisplay, -)] -#[strum(serialize_all = "kebab-case")] -pub enum GetImpl { - Legacy, - Vectored, -} - #[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(transparent)] pub struct MaxVectoredReadBytes(pub NonZeroUsize); @@ -338,8 +304,6 @@ pub mod defaults { pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm = ImageCompressionAlgorithm::Zstd { level: Some(1) }; - pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false; - pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512; @@ -376,7 +340,10 @@ impl Default for ConfigToml { concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP) .expect("Invalid default constant")), - concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(), + concurrent_tenant_size_logical_size_queries: NonZeroUsize::new( + DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES, + ) + .unwrap(), metric_collection_interval: (humantime::parse_duration( DEFAULT_METRIC_COLLECTION_INTERVAL, ) @@ -467,8 +434,6 @@ pub mod tenant_conf_defaults { // By default ingest enough WAL for two new L0 layers before checking if new image // image layers should be created. pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2; - - pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100; } impl Default for TenantConfigToml { diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 45e84baa1f..c9be53f0b0 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -495,7 +495,7 @@ pub struct CompactionAlgorithmSettings { pub kind: CompactionAlgorithm, } -#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] +#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)] pub enum L0FlushConfig { #[serde(rename_all = "snake_case")] diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 8ea4b93fb1..e274d24585 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -280,16 +280,6 @@ pub struct PostgresBackend { pub type PostgresBackendTCP = PostgresBackend; -pub fn query_from_cstring(query_string: Bytes) -> Vec { - let mut query_string = query_string.to_vec(); - if let Some(ch) = query_string.last() { - if *ch == 0 { - query_string.pop(); - } - } - query_string -} - /// Cast a byte slice to a string slice, dropping null terminator if there's one. fn cstr_to_str(bytes: &[u8]) -> anyhow::Result<&str> { let without_null = bytes.strip_suffix(&[0]).unwrap_or(bytes); diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 61b49a634d..497d011d7a 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -9,8 +9,8 @@ //! comments on them. //! +use crate::PageHeaderData; use crate::BLCKSZ; -use crate::{PageHeaderData, XLogRecord}; // // From pg_tablespace_d.h @@ -194,8 +194,6 @@ pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; pub const XLOG_TBLSPC_CREATE: u8 = 0x00; pub const XLOG_TBLSPC_DROP: u8 = 0x10; -pub const SIZEOF_XLOGRECORD: u32 = size_of::() as u32; - // // from xlogrecord.h // @@ -219,8 +217,6 @@ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ /* From transam.h */ pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3; pub const INVALID_TRANSACTION_ID: u32 = 0; -pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000; -pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; /* pg_control.h */ pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index 949e3f4251..ddaafe65f1 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -26,7 +26,6 @@ macro_rules! xlog_utils_test { postgres_ffi::for_all_postgres_versions! { xlog_utils_test } -#[derive(Debug, Clone, PartialEq, Eq)] pub struct Conf { pub pg_version: u32, pub pg_distrib_dir: PathBuf, diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index b5b69c9faf..45267ccda9 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -127,10 +127,6 @@ impl RemotePath { &self.0 } - pub fn extension(&self) -> Option<&str> { - self.0.extension() - } - pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> { self.0.strip_prefix(&p.0) } diff --git a/libs/utils/src/vec_map.rs b/libs/utils/src/vec_map.rs index 5f0028bacd..1fe048c6f0 100644 --- a/libs/utils/src/vec_map.rs +++ b/libs/utils/src/vec_map.rs @@ -120,32 +120,6 @@ impl VecMap { Ok((None, delta_size)) } - /// Split the map into two. - /// - /// The left map contains everything before `cutoff` (exclusive). - /// Right map contains `cutoff` and everything after (inclusive). - pub fn split_at(&self, cutoff: &K) -> (Self, Self) - where - K: Clone, - V: Clone, - { - let split_idx = self - .data - .binary_search_by_key(&cutoff, extract_key) - .unwrap_or_else(std::convert::identity); - - ( - VecMap { - data: self.data[..split_idx].to_vec(), - ordering: self.ordering, - }, - VecMap { - data: self.data[split_idx..].to_vec(), - ordering: self.ordering, - }, - ) - } - /// Move items from `other` to the end of `self`, leaving `other` empty. /// If the `other` ordering is different from `self` ordering /// `ExtendOrderingError` error will be returned. diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index e9f197ec2d..8567c6aa52 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -13,7 +13,6 @@ use pageserver_api::{ use remote_storage::{RemotePath, RemoteStorageConfig}; use std::env; use storage_broker::Uri; -use utils::crashsafe::path_with_suffix_extension; use utils::logging::SecretString; use once_cell::sync::OnceCell; @@ -33,7 +32,7 @@ use crate::tenant::storage_layer::inmemory_layer::IndexEntry; use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use crate::virtual_file; use crate::virtual_file::io_engine; -use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX}; +use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME}; /// Global state of pageserver. /// @@ -257,17 +256,6 @@ impl PageServerConf { .join(timeline_id.to_string()) } - pub(crate) fn timeline_delete_mark_file_path( - &self, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - ) -> Utf8PathBuf { - path_with_suffix_extension( - self.timeline_path(&tenant_shard_id, &timeline_id), - TIMELINE_DELETE_MARK_SUFFIX, - ) - } - /// Turns storage remote path of a file into its local path. pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf { remote_path.with_base(&self.workdir) @@ -491,11 +479,6 @@ pub struct ConfigurableSemaphore { } impl ConfigurableSemaphore { - pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) { - Some(x) => x, - None => panic!("const unwrap is not yet stable"), - }; - /// Initializse using a non-zero amount of permits. /// /// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a @@ -516,12 +499,6 @@ impl ConfigurableSemaphore { } } -impl Default for ConfigurableSemaphore { - fn default() -> Self { - Self::new(Self::DEFAULT_INITIAL) - } -} - impl PartialEq for ConfigurableSemaphore { fn eq(&self, other: &Self) -> bool { // the number of permits can be increased at runtime, so we cannot really fulfill the diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index dc4118bb4a..90db08ea81 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -135,25 +135,6 @@ async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<( .context("delete_all") } -// This function removs remaining traces of a timeline on disk. -// Namely: metadata file, timeline directory, delete mark. -// Note: io::ErrorKind::NotFound are ignored for metadata and timeline dir. -// delete mark should be present because it is the last step during deletion. -// (nothing can fail after its deletion) -async fn cleanup_remaining_timeline_fs_traces( - conf: &PageServerConf, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, -) -> anyhow::Result<()> { - // Remove delete mark - // TODO: once we are confident that no more exist in the field, remove this - // line. It cleans up a legacy marker file that might in rare cases be present. - tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_shard_id, timeline_id)) - .await - .or_else(fs_ext::ignore_not_found) - .context("remove delete mark") -} - /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] async fn remove_timeline_from_tenant( @@ -194,12 +175,10 @@ async fn remove_timeline_from_tenant( /// 7. Delete mark file /// /// It is resumable from any step in case a crash/restart occurs. -/// There are three entrypoints to the process: +/// There are two entrypoints to the process: /// 1. [`DeleteTimelineFlow::run`] this is the main one called by a management api handler. /// 2. [`DeleteTimelineFlow::resume_deletion`] is called during restarts when local metadata is still present /// and we possibly neeed to continue deletion of remote files. -/// 3. [`DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`] is used when we deleted remote -/// index but still have local metadata, timeline directory and delete mark. /// /// Note the only other place that messes around timeline delete mark is the logic that scans directory with timelines during tenant load. #[derive(Default)] @@ -311,18 +290,6 @@ impl DeleteTimelineFlow { Ok(()) } - #[instrument(skip_all, fields(%timeline_id))] - pub async fn cleanup_remaining_timeline_fs_traces( - tenant: &Tenant, - timeline_id: TimelineId, - ) -> anyhow::Result<()> { - let r = - cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_shard_id, timeline_id) - .await; - info!("Done"); - r - } - fn prepare( tenant: &Tenant, timeline_id: TimelineId, diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index e5c5b47795..2ac66ffe8c 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -62,12 +62,13 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[derive(Clone, Debug, ValueEnum)] enum AuthBackendType { Console, - #[cfg(feature = "testing")] - Postgres, // clap only shows the name, not the alias, in usage text. // TODO: swap name/alias and deprecate "link" #[value(name("link"), alias("web"))] Web, + + #[cfg(feature = "testing")] + Postgres, } /// Neon proxy/router @@ -639,17 +640,19 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { let api = console::provider::ConsoleBackend::Console(api); auth::Backend::Console(MaybeOwned::Owned(api), ()) } - #[cfg(feature = "testing")] - AuthBackendType::Postgres => { - let url = args.auth_endpoint.parse()?; - let api = console::provider::mock::Api::new(url); - let api = console::provider::ConsoleBackend::Postgres(api); - auth::Backend::Console(MaybeOwned::Owned(api), ()) - } + AuthBackendType::Web => { let url = args.uri.parse()?; auth::Backend::Web(MaybeOwned::Owned(url), ()) } + + #[cfg(feature = "testing")] + AuthBackendType::Postgres => { + let url = args.auth_endpoint.parse()?; + let api = console::provider::mock::Api::new(url, !args.is_private_access_proxy); + let api = console::provider::ConsoleBackend::Postgres(api); + auth::Backend::Console(MaybeOwned::Owned(api), ()) + } }; let config::ConcurrencyLockOptions { diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index 08b87cd87a..1b77418de6 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -41,11 +41,15 @@ impl From for ApiError { #[derive(Clone)] pub struct Api { endpoint: ApiUrl, + ip_allowlist_check_enabled: bool, } impl Api { - pub fn new(endpoint: ApiUrl) -> Self { - Self { endpoint } + pub fn new(endpoint: ApiUrl, ip_allowlist_check_enabled: bool) -> Self { + Self { + endpoint, + ip_allowlist_check_enabled, + } } pub(crate) fn url(&self) -> &str { @@ -64,6 +68,7 @@ impl Api { tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?; tokio::spawn(connection); + let secret = if let Some(entry) = get_execute_postgres_query( &client, "select rolpassword from pg_catalog.pg_authid where rolname = $1", @@ -79,21 +84,26 @@ impl Api { warn!("user '{}' does not exist", user_info.user); None }; - let allowed_ips = match get_execute_postgres_query( - &client, - "select allowed_ips from neon_control_plane.endpoints where endpoint_id = $1", - &[&user_info.endpoint.as_str()], - "allowed_ips", - ) - .await? - { - Some(s) => { - info!("got allowed_ips: {s}"); - s.split(',') - .map(|s| IpPattern::from_str(s).unwrap()) - .collect() + + let allowed_ips = if self.ip_allowlist_check_enabled { + match get_execute_postgres_query( + &client, + "select allowed_ips from neon_control_plane.endpoints where endpoint_id = $1", + &[&user_info.endpoint.as_str()], + "allowed_ips", + ) + .await? + { + Some(s) => { + info!("got allowed_ips: {s}"); + s.split(',') + .map(|s| IpPattern::from_str(s).unwrap()) + .collect() + } + None => vec![], } - None => vec![], + } else { + vec![] }; Ok((secret, allowed_ips)) diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 64585f5edc..c772ae6de7 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -278,7 +278,7 @@ impl WalResidentTimeline { } /// pull_timeline request body. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize)] pub struct Request { pub tenant_id: TenantId, pub timeline_id: TimelineId, @@ -293,7 +293,7 @@ pub struct Response { } /// Response for debug dump request. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize)] pub struct DebugDumpResponse { pub start_time: DateTime, pub finish_time: DateTime, diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index c5961753c5..ee133e2e58 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -121,8 +121,6 @@ enum Command { async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); - tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG); - let bucket_config = BucketConfig::from_env()?; let command_log_name = match &cli.command { @@ -142,6 +140,8 @@ async fn main() -> anyhow::Result<()> { chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") )); + tracing::info!("version: {}, build_tag {}", GIT_VERSION, BUILD_TAG); + let controller_client = cli.controller_api.map(|controller_api| { ControllerClientConfig { controller_api, diff --git a/test_runner/regress/test_postgres_version.py b/test_runner/regress/test_postgres_version.py index 4145a303c6..d8626c15a5 100644 --- a/test_runner/regress/test_postgres_version.py +++ b/test_runner/regress/test_postgres_version.py @@ -30,9 +30,8 @@ def test_postgres_version(base_dir: Path, pg_bin: PgBin, pg_version: PgVersion): version = match.group("version") commit = match.group("commit") - if "." in version: - assert ( - pg_version.v_prefixed in expected_revisions - ), f"Released PostgreSQL version `{pg_version.v_prefixed}` doesn't exist in `vendor/revisions.json`, please update it if these changes are intentional" - msg = f"Unexpected Postgres {pg_version} version: `{output}`, please update `vendor/revisions.json` if these changes are intentional" - assert [version, commit] == expected_revisions[pg_version.v_prefixed], msg + assert ( + pg_version.v_prefixed in expected_revisions + ), f"Released PostgreSQL version `{pg_version.v_prefixed}` doesn't exist in `vendor/revisions.json`, please update it if these changes are intentional" + msg = f"Unexpected Postgres {pg_version} version: `{output}`, please update `vendor/revisions.json` if these changes are intentional" + assert [version, commit] == expected_revisions[pg_version.v_prefixed], msg diff --git a/vendor/revisions.json b/vendor/revisions.json index 3a65a507f3..c2c34962bb 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,4 +1,8 @@ { + "v17": [ + "17rc1", + "9156d63ce253bed9d1f76355ceec610e444eaffa" + ], "v16": [ "16.4", "0baa7346dfd42d61912eeca554c9bb0a190f0a1e"