diff --git a/Cargo.lock b/Cargo.lock index 8897364701..d08da0babd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1368,6 +1368,7 @@ dependencies = [ "tracing", "url", "utils", + "whoami", "workspace_hack", ] @@ -4603,6 +4604,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.10.2" @@ -6972,6 +6982,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -7124,6 +7140,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "whoami" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" +dependencies = [ + "redox_syscall 0.4.1", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 4f42203683..b9b4bafb4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -191,6 +191,7 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] } walkdir = "2.3.2" rustls-native-certs = "0.7" x509-parser = "0.15" +whoami = "1.5.1" ## TODO replace this with tracing env_logger = "0.10" diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index e62f3b8a47..487ac8f047 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -40,6 +40,7 @@ safekeeper_api.workspace = true postgres_connection.workspace = true storage_broker.workspace = true utils.workspace = true +whoami.workspace = true compute_api.workspace = true workspace_hack.workspace = true diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 47103a2e0a..d7aedd711a 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -29,7 +29,6 @@ use utils::{ pub struct StorageController { env: LocalEnv, listen: String, - path: Utf8PathBuf, private_key: Option>, public_key: Option, postgres_port: u16, @@ -41,6 +40,8 @@ const COMMAND: &str = "storage_controller"; const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16; +const DB_NAME: &str = "storage_controller"; + #[derive(Serialize, Deserialize)] pub struct AttachHookRequest { pub tenant_shard_id: TenantShardId, @@ -65,10 +66,6 @@ pub struct InspectResponse { impl StorageController { pub fn from_env(env: &LocalEnv) -> Self { - let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone()) - .unwrap() - .join("attachments.json"); - // Makes no sense to construct this if pageservers aren't going to use it: assume // pageservers have control plane API set let listen_url = env.control_plane_api.clone().unwrap(); @@ -128,7 +125,6 @@ impl StorageController { Self { env: env.clone(), - path, listen, private_key, public_key, @@ -203,7 +199,6 @@ impl StorageController { /// /// Returns the database url pub async fn setup_database(&self) -> anyhow::Result { - const DB_NAME: &str = "storage_controller"; let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port); let pg_bin_dir = self.get_pg_bin_dir().await?; @@ -232,6 +227,30 @@ impl StorageController { Ok(database_url) } + pub async fn connect_to_database( + &self, + ) -> anyhow::Result<( + tokio_postgres::Client, + tokio_postgres::Connection, + )> { + tokio_postgres::Config::new() + .host("localhost") + .port(self.postgres_port) + // The user is the ambient operating system user name. + // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400 + // + // Until we get there, use the ambient operating system user name. + // Recent tokio-postgres versions default to this if the user isn't specified. + // But tokio-postgres fork doesn't have this upstream commit: + // https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79 + // => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399 + .user(&whoami::username()) + .dbname(DB_NAME) + .connect(tokio_postgres::NoTls) + .await + .map_err(anyhow::Error::new) + } + pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { // Start a vanilla Postgres process used by the storage controller for persistence. let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) @@ -256,18 +275,21 @@ impl StorageController { if !status.success() { anyhow::bail!("initdb failed with status {status}"); } - - // Write a minimal config file: - // - Specify the port, since this is chosen dynamically - // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing - // the storage controller we don't want a slow local disk to interfere with that. - tokio::fs::write( - &pg_data_path.join("postgresql.conf"), - format!("port = {}\nfsync=off\n", self.postgres_port), - ) - .await?; }; + // Write a minimal config file: + // - Specify the port, since this is chosen dynamically + // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing + // the storage controller we don't want a slow local disk to interfere with that. + // + // NB: it's important that we rewrite this file on each start command so we propagate changes + // from `LocalEnv`'s config file (`.neon/config`). + tokio::fs::write( + &pg_data_path.join("postgresql.conf"), + format!("port = {}\nfsync=off\n", self.postgres_port), + ) + .await?; + println!("Starting storage controller database..."); let db_start_args = [ "-w", @@ -296,11 +318,38 @@ impl StorageController { // Run migrations on every startup, in case something changed. let database_url = self.setup_database().await?; + // We support running a startup SQL script to fiddle with the database before we launch storcon. + // This is used by the test suite. + let startup_script_path = self + .env + .base_data_dir + .join("storage_controller_db.startup.sql"); + let startup_script = match tokio::fs::read_to_string(&startup_script_path).await { + Ok(script) => { + tokio::fs::remove_file(startup_script_path).await?; + script + } + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + // always run some startup script so that this code path doesn't bit rot + "BEGIN; COMMIT;".to_string() + } else { + anyhow::bail!("Failed to read startup script: {e}") + } + } + }; + let (mut client, conn) = self.connect_to_database().await?; + let conn = tokio::spawn(conn); + let tx = client.build_transaction(); + let tx = tx.start().await?; + tx.batch_execute(&startup_script).await?; + tx.commit().await?; + drop(client); + conn.await??; + let mut args = vec![ "-l", &self.listen, - "-p", - self.path.as_ref(), "--dev", "--database-url", &database_url, diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index f1eb0b30fc..4bf6b528f4 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Context}; -use camino::Utf8PathBuf; use clap::Parser; use diesel::Connection; use metrics::launch_timestamp::LaunchTimestamp; @@ -51,10 +50,6 @@ struct Cli { #[arg(long)] compute_hook_url: Option, - /// Path to the .json file to store state (will be created if it doesn't exist) - #[arg(short, long)] - path: Option, - /// URL to connect to postgres, like postgresql://localhost:1234/storage_controller #[arg(long)] database_url: Option, @@ -206,11 +201,10 @@ async fn async_main() -> anyhow::Result<()> { let args = Cli::parse(); tracing::info!( - "version: {}, launch_timestamp: {}, build_tag {}, state at {}, listening on {}", + "version: {}, launch_timestamp: {}, build_tag {}, listening on {}", GIT_VERSION, launch_ts.to_string(), BUILD_TAG, - args.path.as_ref().unwrap_or(&Utf8PathBuf::from("")), args.listen ); @@ -277,8 +271,7 @@ async fn async_main() -> anyhow::Result<()> { .await .context("Running database migrations")?; - let json_path = args.path; - let persistence = Arc::new(Persistence::new(secrets.database_url, json_path.clone())); + let persistence = Arc::new(Persistence::new(secrets.database_url)); let service = Service::spawn(config, persistence.clone()).await?; @@ -316,14 +309,6 @@ async fn async_main() -> anyhow::Result<()> { } tracing::info!("Terminating on signal"); - if json_path.is_some() { - // Write out a JSON dump on shutdown: this is used in compat tests to avoid passing - // full postgres dumps around. - if let Err(e) = persistence.write_tenants_json().await { - tracing::error!("Failed to write JSON on shutdown: {e}") - } - } - // Stop HTTP server first, so that we don't have to service requests // while shutting down Service server_shutdown.cancel(); diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 9f7b2f775e..d8f31e86e5 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -5,8 +5,6 @@ use std::time::Duration; use std::time::Instant; use self::split_state::SplitState; -use camino::Utf8Path; -use camino::Utf8PathBuf; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::Connection; @@ -55,11 +53,6 @@ use crate::node::Node; /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { connection_pool: diesel::r2d2::Pool>, - - // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of - // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward - // compatible just yet. - json_path: Option, } /// Legacy format, for use in JSON compat objects in test environment @@ -124,7 +117,7 @@ impl Persistence { const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60); - pub fn new(database_url: String, json_path: Option) -> Self { + pub fn new(database_url: String) -> Self { let manager = diesel::r2d2::ConnectionManager::::new(database_url); // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time @@ -139,10 +132,7 @@ impl Persistence { .build(manager) .expect("Could not build connection pool"); - Self { - connection_pool, - json_path, - } + Self { connection_pool } } /// A helper for use during startup, where we would like to tolerate concurrent restarts of the @@ -302,85 +292,13 @@ impl Persistence { /// At startup, load the high level state for shards, such as their config + policy. This will /// be enriched at runtime with state discovered on pageservers. pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult> { - let loaded = self - .with_measured_conn( - DatabaseOperation::ListTenantShards, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::tenant_shards::table.load::(conn)?) - }, - ) - .await?; - - if loaded.is_empty() { - if let Some(path) = &self.json_path { - if tokio::fs::try_exists(path) - .await - .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))? - { - tracing::info!("Importing from legacy JSON format at {path}"); - return self.list_tenant_shards_json(path).await; - } - } - } - Ok(loaded) - } - - /// Shim for automated compatibility tests: load tenants from a JSON file instead of database - pub(crate) async fn list_tenant_shards_json( - &self, - path: &Utf8Path, - ) -> DatabaseResult> { - let bytes = tokio::fs::read(path) - .await - .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?; - - let mut decoded = serde_json::from_slice::(&bytes) - .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?; - for shard in decoded.tenants.values_mut() { - if shard.placement_policy == "\"Single\"" { - // Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165 - shard.placement_policy = "{\"Attached\":0}".to_string(); - } - - if shard.scheduling_policy.is_empty() { - shard.scheduling_policy = - serde_json::to_string(&ShardSchedulingPolicy::default()).unwrap(); - } - } - - let tenants: Vec = decoded.tenants.into_values().collect(); - - // Synchronize database with what is in the JSON file - self.insert_tenant_shards(tenants.clone()).await?; - - Ok(tenants) - } - - /// For use in testing environments, where we dump out JSON on shutdown. - pub async fn write_tenants_json(&self) -> anyhow::Result<()> { - let Some(path) = &self.json_path else { - anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)"); - }; - tracing::info!("Writing state to {path}..."); - let tenants = self.list_tenant_shards().await?; - let mut tenants_map = HashMap::new(); - for tsp in tenants { - let tenant_shard_id = TenantShardId { - tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?, - shard_number: ShardNumber(tsp.shard_number as u8), - shard_count: ShardCount::new(tsp.shard_count as u8), - }; - - tenants_map.insert(tenant_shard_id, tsp); - } - let json = serde_json::to_string(&JsonPersistence { - tenants: tenants_map, - })?; - - tokio::fs::write(path, &json).await?; - tracing::info!("Wrote {} bytes to {path}...", json.len()); - - Ok(()) + self.with_measured_conn( + DatabaseOperation::ListTenantShards, + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::tenant_shards::table.load::(conn)?) + }, + ) + .await } /// Tenants must be persisted before we schedule them for the first time. This enables us diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 2765ff916e..fcfd4ea676 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -31,6 +31,7 @@ import backoff import httpx import jwt import psycopg2 +import psycopg2.sql import pytest import requests import toml @@ -727,8 +728,30 @@ class NeonEnvBuilder: self.repo_dir / "local_fs_remote_storage", ) - if (attachments_json := Path(repo_dir / "attachments.json")).exists(): - shutil.copyfile(attachments_json, self.repo_dir / attachments_json.name) + # restore storage controller (the db is small, don't bother with overlayfs) + storcon_db_from_dir = repo_dir / "storage_controller_db" + storcon_db_to_dir = self.repo_dir / "storage_controller_db" + log.info(f"Copying storage_controller_db from {storcon_db_from_dir} to {storcon_db_to_dir}") + assert storcon_db_from_dir.is_dir() + assert not storcon_db_to_dir.exists() + + def ignore_postgres_log(path: str, _names): + if Path(path) == storcon_db_from_dir: + return {"postgres.log"} + return set() + + shutil.copytree(storcon_db_from_dir, storcon_db_to_dir, ignore=ignore_postgres_log) + assert not (storcon_db_to_dir / "postgres.log").exists() + # NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it. + # However, in this new NeonEnv, the pageservers listen on different ports, and the storage controller + # will currently reject re-attach requests from them because the NodeMetadata isn't identical. + # So, from_repo_dir patches up the the storcon database. + patch_script_path = self.repo_dir / "storage_controller_db.startup.sql" + assert not patch_script_path.exists() + patch_script = "" + for ps in self.env.pageservers: + patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';" + patch_script_path.write_text(patch_script) # Update the config with info about tenants and timelines with (self.repo_dir / "config").open("r") as f: diff --git a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py index 60861cf939..949813c984 100644 --- a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py +++ b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py @@ -255,11 +255,3 @@ def run_pagebench_benchmark( unit="ms", report=MetricReport.LOWER_IS_BETTER, ) - - env.storage_controller.allowed_errors.append( - # The test setup swaps NeonEnv instances, hence different - # pg instances are used for the storage controller db. This means - # the storage controller doesn't know about the nodes mentioned - # in attachments.json at start-up. - ".* Scheduler missing node 1", - ) diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 1e5e320e0e..65649e0c0a 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -93,29 +93,6 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif( ) -def fixup_storage_controller(env: NeonEnv): - """ - After importing a repo_dir, we need to massage the storage controller's state a bit: it will have - initially started up with no nodes, but some tenants, and thereby those tenants won't be scheduled - anywhere. - - After NeonEnv.start() is done (i.e. nodes are started + registered), call this function to get - the storage controller into a good state. - - This function should go away once compat tests carry the controller database in their snapshots, so - that the controller properly remembers nodes between creating + restoring the snapshot. - """ - env.storage_controller.allowed_errors.extend( - [ - ".*Tenant shard .+ references non-existent node.*", - ".*Failed to schedule tenant .+ at startup.*", - ] - ) - env.storage_controller.stop() - env.storage_controller.start() - env.storage_controller.reconcile_until_idle() - - @pytest.mark.xdist_group("compatibility") @pytest.mark.order(before="test_forward_compatibility") def test_create_snapshot( @@ -198,7 +175,6 @@ def test_backward_compatibility( neon_env_builder.num_safekeepers = 3 env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo") neon_env_builder.start() - fixup_storage_controller(env) check_neon_works( env, @@ -287,7 +263,6 @@ def test_forward_compatibility( assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version) neon_env_builder.start() - fixup_storage_controller(env) # ensure the specified pageserver is running assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)