NeonEnv.from_repo_dir: use storage_controller_db instead of attachments.json (#8382)

When `NeonEnv.from_repo_dir` was introduced, storage controller stored
its
state exclusively `attachments.json`.
Since then, it has moved to using Postgres, which stores its state in
`storage_controller_db`.

But `NeonEnv.from_repo_dir` wasn't adjusted to do this.
This PR rectifies the situation.

Context for this is failures in
`test_pageserver_characterize_throughput_with_n_tenants`
CF:
https://neondb.slack.com/archives/C033RQ5SPDH/p1721035799502239?thread_ts=1720901332.293769&cid=C033RQ5SPDH

Notably, `from_repo_dir` is also used by the backwards- and
forwards-compatibility.
Thus, the changes in this PR affect those tests as well.
However, it turns out that the compatibility snapshot already contains
the `storage_controller_db`.
Thus, it should just work and in fact we can remove hacks like
`fixup_storage_controller`.

Follow-ups created as part of this work:
* https://github.com/neondatabase/neon/issues/8399
* https://github.com/neondatabase/neon/issues/8400
This commit is contained in:
Christian Schwarz
2024-07-18 10:56:07 +02:00
committed by GitHub
parent 1303d47778
commit a2d170b6d0
9 changed files with 133 additions and 162 deletions

27
Cargo.lock generated
View File

@@ -1368,6 +1368,7 @@ dependencies = [
"tracing",
"url",
"utils",
"whoami",
"workspace_hack",
]
@@ -4603,6 +4604,15 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "regex"
version = "1.10.2"
@@ -6972,6 +6982,12 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasite"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.92"
@@ -7124,6 +7140,17 @@ dependencies = [
"once_cell",
]
[[package]]
name = "whoami"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9"
dependencies = [
"redox_syscall 0.4.1",
"wasite",
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -191,6 +191,7 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
rustls-native-certs = "0.7"
x509-parser = "0.15"
whoami = "1.5.1"
## TODO replace this with tracing
env_logger = "0.10"

View File

@@ -40,6 +40,7 @@ safekeeper_api.workspace = true
postgres_connection.workspace = true
storage_broker.workspace = true
utils.workspace = true
whoami.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true

View File

@@ -29,7 +29,6 @@ use utils::{
pub struct StorageController {
env: LocalEnv,
listen: String,
path: Utf8PathBuf,
private_key: Option<Vec<u8>>,
public_key: Option<String>,
postgres_port: u16,
@@ -41,6 +40,8 @@ const COMMAND: &str = "storage_controller";
const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
const DB_NAME: &str = "storage_controller";
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
@@ -65,10 +66,6 @@ pub struct InspectResponse {
impl StorageController {
pub fn from_env(env: &LocalEnv) -> Self {
let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
.unwrap()
.join("attachments.json");
// Makes no sense to construct this if pageservers aren't going to use it: assume
// pageservers have control plane API set
let listen_url = env.control_plane_api.clone().unwrap();
@@ -128,7 +125,6 @@ impl StorageController {
Self {
env: env.clone(),
path,
listen,
private_key,
public_key,
@@ -203,7 +199,6 @@ impl StorageController {
///
/// Returns the database url
pub async fn setup_database(&self) -> anyhow::Result<String> {
const DB_NAME: &str = "storage_controller";
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
let pg_bin_dir = self.get_pg_bin_dir().await?;
@@ -232,6 +227,30 @@ impl StorageController {
Ok(database_url)
}
pub async fn connect_to_database(
&self,
) -> anyhow::Result<(
tokio_postgres::Client,
tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
)> {
tokio_postgres::Config::new()
.host("localhost")
.port(self.postgres_port)
// The user is the ambient operating system user name.
// That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400
//
// Until we get there, use the ambient operating system user name.
// Recent tokio-postgres versions default to this if the user isn't specified.
// But tokio-postgres fork doesn't have this upstream commit:
// https://github.com/sfackler/rust-postgres/commit/cb609be758f3fb5af537f04b584a2ee0cebd5e79
// => we should rebase our fork => TODO https://github.com/neondatabase/neon/issues/8399
.user(&whoami::username())
.dbname(DB_NAME)
.connect(tokio_postgres::NoTls)
.await
.map_err(anyhow::Error::new)
}
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
// Start a vanilla Postgres process used by the storage controller for persistence.
let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
@@ -256,18 +275,21 @@ impl StorageController {
if !status.success() {
anyhow::bail!("initdb failed with status {status}");
}
// Write a minimal config file:
// - Specify the port, since this is chosen dynamically
// - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
// the storage controller we don't want a slow local disk to interfere with that.
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}\nfsync=off\n", self.postgres_port),
)
.await?;
};
// Write a minimal config file:
// - Specify the port, since this is chosen dynamically
// - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
// the storage controller we don't want a slow local disk to interfere with that.
//
// NB: it's important that we rewrite this file on each start command so we propagate changes
// from `LocalEnv`'s config file (`.neon/config`).
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}\nfsync=off\n", self.postgres_port),
)
.await?;
println!("Starting storage controller database...");
let db_start_args = [
"-w",
@@ -296,11 +318,38 @@ impl StorageController {
// Run migrations on every startup, in case something changed.
let database_url = self.setup_database().await?;
// We support running a startup SQL script to fiddle with the database before we launch storcon.
// This is used by the test suite.
let startup_script_path = self
.env
.base_data_dir
.join("storage_controller_db.startup.sql");
let startup_script = match tokio::fs::read_to_string(&startup_script_path).await {
Ok(script) => {
tokio::fs::remove_file(startup_script_path).await?;
script
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// always run some startup script so that this code path doesn't bit rot
"BEGIN; COMMIT;".to_string()
} else {
anyhow::bail!("Failed to read startup script: {e}")
}
}
};
let (mut client, conn) = self.connect_to_database().await?;
let conn = tokio::spawn(conn);
let tx = client.build_transaction();
let tx = tx.start().await?;
tx.batch_execute(&startup_script).await?;
tx.commit().await?;
drop(client);
conn.await??;
let mut args = vec![
"-l",
&self.listen,
"-p",
self.path.as_ref(),
"--dev",
"--database-url",
&database_url,

View File

@@ -1,5 +1,4 @@
use anyhow::{anyhow, Context};
use camino::Utf8PathBuf;
use clap::Parser;
use diesel::Connection;
use metrics::launch_timestamp::LaunchTimestamp;
@@ -51,10 +50,6 @@ struct Cli {
#[arg(long)]
compute_hook_url: Option<String>,
/// Path to the .json file to store state (will be created if it doesn't exist)
#[arg(short, long)]
path: Option<Utf8PathBuf>,
/// URL to connect to postgres, like postgresql://localhost:1234/storage_controller
#[arg(long)]
database_url: Option<String>,
@@ -206,11 +201,10 @@ async fn async_main() -> anyhow::Result<()> {
let args = Cli::parse();
tracing::info!(
"version: {}, launch_timestamp: {}, build_tag {}, state at {}, listening on {}",
"version: {}, launch_timestamp: {}, build_tag {}, listening on {}",
GIT_VERSION,
launch_ts.to_string(),
BUILD_TAG,
args.path.as_ref().unwrap_or(&Utf8PathBuf::from("<none>")),
args.listen
);
@@ -277,8 +271,7 @@ async fn async_main() -> anyhow::Result<()> {
.await
.context("Running database migrations")?;
let json_path = args.path;
let persistence = Arc::new(Persistence::new(secrets.database_url, json_path.clone()));
let persistence = Arc::new(Persistence::new(secrets.database_url));
let service = Service::spawn(config, persistence.clone()).await?;
@@ -316,14 +309,6 @@ async fn async_main() -> anyhow::Result<()> {
}
tracing::info!("Terminating on signal");
if json_path.is_some() {
// Write out a JSON dump on shutdown: this is used in compat tests to avoid passing
// full postgres dumps around.
if let Err(e) = persistence.write_tenants_json().await {
tracing::error!("Failed to write JSON on shutdown: {e}")
}
}
// Stop HTTP server first, so that we don't have to service requests
// while shutting down Service
server_shutdown.cancel();

View File

@@ -5,8 +5,6 @@ use std::time::Duration;
use std::time::Instant;
use self::split_state::SplitState;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
@@ -55,11 +53,6 @@ use crate::node::Node;
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
// compatible just yet.
json_path: Option<Utf8PathBuf>,
}
/// Legacy format, for use in JSON compat objects in test environment
@@ -124,7 +117,7 @@ impl Persistence {
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
pub fn new(database_url: String) -> Self {
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
@@ -139,10 +132,7 @@ impl Persistence {
.build(manager)
.expect("Could not build connection pool");
Self {
connection_pool,
json_path,
}
Self { connection_pool }
}
/// A helper for use during startup, where we would like to tolerate concurrent restarts of the
@@ -302,85 +292,13 @@ impl Persistence {
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
let loaded = self
.with_measured_conn(
DatabaseOperation::ListTenantShards,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
},
)
.await?;
if loaded.is_empty() {
if let Some(path) = &self.json_path {
if tokio::fs::try_exists(path)
.await
.map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
{
tracing::info!("Importing from legacy JSON format at {path}");
return self.list_tenant_shards_json(path).await;
}
}
}
Ok(loaded)
}
/// Shim for automated compatibility tests: load tenants from a JSON file instead of database
pub(crate) async fn list_tenant_shards_json(
&self,
path: &Utf8Path,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
let bytes = tokio::fs::read(path)
.await
.map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
.map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
for shard in decoded.tenants.values_mut() {
if shard.placement_policy == "\"Single\"" {
// Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165
shard.placement_policy = "{\"Attached\":0}".to_string();
}
if shard.scheduling_policy.is_empty() {
shard.scheduling_policy =
serde_json::to_string(&ShardSchedulingPolicy::default()).unwrap();
}
}
let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
// Synchronize database with what is in the JSON file
self.insert_tenant_shards(tenants.clone()).await?;
Ok(tenants)
}
/// For use in testing environments, where we dump out JSON on shutdown.
pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
let Some(path) = &self.json_path else {
anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
};
tracing::info!("Writing state to {path}...");
let tenants = self.list_tenant_shards().await?;
let mut tenants_map = HashMap::new();
for tsp in tenants {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount::new(tsp.shard_count as u8),
};
tenants_map.insert(tenant_shard_id, tsp);
}
let json = serde_json::to_string(&JsonPersistence {
tenants: tenants_map,
})?;
tokio::fs::write(path, &json).await?;
tracing::info!("Wrote {} bytes to {path}...", json.len());
Ok(())
self.with_measured_conn(
DatabaseOperation::ListTenantShards,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
},
)
.await
}
/// Tenants must be persisted before we schedule them for the first time. This enables us

View File

@@ -31,6 +31,7 @@ import backoff
import httpx
import jwt
import psycopg2
import psycopg2.sql
import pytest
import requests
import toml
@@ -727,8 +728,30 @@ class NeonEnvBuilder:
self.repo_dir / "local_fs_remote_storage",
)
if (attachments_json := Path(repo_dir / "attachments.json")).exists():
shutil.copyfile(attachments_json, self.repo_dir / attachments_json.name)
# restore storage controller (the db is small, don't bother with overlayfs)
storcon_db_from_dir = repo_dir / "storage_controller_db"
storcon_db_to_dir = self.repo_dir / "storage_controller_db"
log.info(f"Copying storage_controller_db from {storcon_db_from_dir} to {storcon_db_to_dir}")
assert storcon_db_from_dir.is_dir()
assert not storcon_db_to_dir.exists()
def ignore_postgres_log(path: str, _names):
if Path(path) == storcon_db_from_dir:
return {"postgres.log"}
return set()
shutil.copytree(storcon_db_from_dir, storcon_db_to_dir, ignore=ignore_postgres_log)
assert not (storcon_db_to_dir / "postgres.log").exists()
# NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it.
# However, in this new NeonEnv, the pageservers listen on different ports, and the storage controller
# will currently reject re-attach requests from them because the NodeMetadata isn't identical.
# So, from_repo_dir patches up the the storcon database.
patch_script_path = self.repo_dir / "storage_controller_db.startup.sql"
assert not patch_script_path.exists()
patch_script = ""
for ps in self.env.pageservers:
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
patch_script_path.write_text(patch_script)
# Update the config with info about tenants and timelines
with (self.repo_dir / "config").open("r") as f:

View File

@@ -255,11 +255,3 @@ def run_pagebench_benchmark(
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
env.storage_controller.allowed_errors.append(
# The test setup swaps NeonEnv instances, hence different
# pg instances are used for the storage controller db. This means
# the storage controller doesn't know about the nodes mentioned
# in attachments.json at start-up.
".* Scheduler missing node 1",
)

View File

@@ -93,29 +93,6 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
)
def fixup_storage_controller(env: NeonEnv):
"""
After importing a repo_dir, we need to massage the storage controller's state a bit: it will have
initially started up with no nodes, but some tenants, and thereby those tenants won't be scheduled
anywhere.
After NeonEnv.start() is done (i.e. nodes are started + registered), call this function to get
the storage controller into a good state.
This function should go away once compat tests carry the controller database in their snapshots, so
that the controller properly remembers nodes between creating + restoring the snapshot.
"""
env.storage_controller.allowed_errors.extend(
[
".*Tenant shard .+ references non-existent node.*",
".*Failed to schedule tenant .+ at startup.*",
]
)
env.storage_controller.stop()
env.storage_controller.start()
env.storage_controller.reconcile_until_idle()
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(before="test_forward_compatibility")
def test_create_snapshot(
@@ -198,7 +175,6 @@ def test_backward_compatibility(
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
neon_env_builder.start()
fixup_storage_controller(env)
check_neon_works(
env,
@@ -287,7 +263,6 @@ def test_forward_compatibility(
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
neon_env_builder.start()
fixup_storage_controller(env)
# ensure the specified pageserver is running
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)