mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
control_plane: follow up for embedded migrations (#6647)
## Problem In https://github.com/neondatabase/neon/pull/6637, we remove the need to run migrations externally, but for compat tests to work we can't remove those invocations from the neon_local binary. Once that previous PR merges, we can make the followup changes without upsetting compat tests.
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1329,8 +1329,6 @@ dependencies = [
|
||||
"clap",
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"diesel",
|
||||
"diesel_migrations",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
@@ -6832,8 +6830,6 @@ dependencies = [
|
||||
"clap",
|
||||
"clap_builder",
|
||||
"crossbeam-utils",
|
||||
"diesel",
|
||||
"diesel_derives",
|
||||
"either",
|
||||
"fail",
|
||||
"futures-channel",
|
||||
|
||||
@@ -10,8 +10,6 @@ async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
comfy-table.workspace = true
|
||||
diesel = { version = "2.1.4", features = ["postgres"]}
|
||||
diesel_migrations = { version = "2.1.0", features = ["postgres"]}
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
nix.workspace = true
|
||||
|
||||
@@ -1,11 +1,5 @@
|
||||
use crate::{background_process, local_env::LocalEnv};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use diesel::{
|
||||
backend::Backend,
|
||||
query_builder::{AstPass, QueryFragment, QueryId},
|
||||
Connection, PgConnection, QueryResult, RunQueryDsl,
|
||||
};
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
use hyper::Method;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
@@ -17,7 +11,7 @@ use pageserver_api::{
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use postgres_backend::AuthType;
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{env, str::FromStr};
|
||||
use std::str::FromStr;
|
||||
use tokio::process::Command;
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
@@ -273,37 +267,6 @@ impl AttachmentService {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
/// In order to access database migrations, we need to find the Neon source tree
|
||||
async fn find_source_root(&self) -> anyhow::Result<Utf8PathBuf> {
|
||||
// We assume that either prd or our binary is in the source tree. The former is usually
|
||||
// true for automated test runners, the latter is usually true for developer workstations. Often
|
||||
// both are true, which is fine.
|
||||
let candidate_start_points = [
|
||||
// Current working directory
|
||||
Utf8PathBuf::from_path_buf(std::env::current_dir()?).unwrap(),
|
||||
// Directory containing the binary we're running inside
|
||||
Utf8PathBuf::from_path_buf(env::current_exe()?.parent().unwrap().to_owned()).unwrap(),
|
||||
];
|
||||
|
||||
// For each candidate start point, search through ancestors looking for a neon.git source tree root
|
||||
for start_point in &candidate_start_points {
|
||||
// Start from the build dir: assumes we are running out of a built neon source tree
|
||||
for path in start_point.ancestors() {
|
||||
// A crude approximation: the root of the source tree is whatever contains a "control_plane"
|
||||
// subdirectory.
|
||||
let control_plane = path.join("control_plane");
|
||||
if tokio::fs::try_exists(&control_plane).await? {
|
||||
return Ok(path.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fall-through
|
||||
Err(anyhow::anyhow!(
|
||||
"Could not find control_plane src dir, after searching ancestors of {candidate_start_points:?}"
|
||||
))
|
||||
}
|
||||
|
||||
/// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
|
||||
///
|
||||
/// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
|
||||
@@ -343,69 +306,32 @@ impl AttachmentService {
|
||||
///
|
||||
/// Returns the database url
|
||||
pub async fn setup_database(&self) -> anyhow::Result<String> {
|
||||
let database_url = format!(
|
||||
"postgresql://localhost:{}/attachment_service",
|
||||
self.postgres_port
|
||||
);
|
||||
println!("Running attachment service database setup...");
|
||||
fn change_database_of_url(database_url: &str, default_database: &str) -> (String, String) {
|
||||
let base = ::url::Url::parse(database_url).unwrap();
|
||||
let database = base.path_segments().unwrap().last().unwrap().to_owned();
|
||||
let mut new_url = base.join(default_database).unwrap();
|
||||
new_url.set_query(base.query());
|
||||
(database, new_url.into())
|
||||
}
|
||||
const DB_NAME: &str = "attachment_service";
|
||||
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateDatabaseStatement {
|
||||
db_name: String,
|
||||
}
|
||||
let pg_bin_dir = self.get_pg_bin_dir().await?;
|
||||
let createdb_path = pg_bin_dir.join("createdb");
|
||||
let output = Command::new(&createdb_path)
|
||||
.args([
|
||||
"-h",
|
||||
"localhost",
|
||||
"-p",
|
||||
&format!("{}", self.postgres_port),
|
||||
&DB_NAME,
|
||||
])
|
||||
.output()
|
||||
.await
|
||||
.expect("Failed to spawn createdb");
|
||||
|
||||
impl CreateDatabaseStatement {
|
||||
pub fn new(db_name: &str) -> Self {
|
||||
CreateDatabaseStatement {
|
||||
db_name: db_name.to_owned(),
|
||||
}
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8(output.stderr).expect("Non-UTF8 output from createdb");
|
||||
if stderr.contains("already exists") {
|
||||
tracing::info!("Database {DB_NAME} already exists");
|
||||
} else {
|
||||
anyhow::bail!("createdb failed with status {}: {stderr}", output.status);
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Backend> QueryFragment<DB> for CreateDatabaseStatement {
|
||||
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
|
||||
out.push_sql("CREATE DATABASE ");
|
||||
out.push_identifier(&self.db_name)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Conn> RunQueryDsl<Conn> for CreateDatabaseStatement {}
|
||||
|
||||
impl QueryId for CreateDatabaseStatement {
|
||||
type QueryId = ();
|
||||
|
||||
const HAS_STATIC_QUERY_ID: bool = false;
|
||||
}
|
||||
if PgConnection::establish(&database_url).is_err() {
|
||||
let (database, postgres_url) = change_database_of_url(&database_url, "postgres");
|
||||
println!("Creating database: {database}");
|
||||
let mut conn = PgConnection::establish(&postgres_url)?;
|
||||
CreateDatabaseStatement::new(&database).execute(&mut conn)?;
|
||||
}
|
||||
let mut conn = PgConnection::establish(&database_url)?;
|
||||
|
||||
let migrations_dir = self
|
||||
.find_source_root()
|
||||
.await?
|
||||
.join("control_plane/attachment_service/migrations");
|
||||
|
||||
let migrations = diesel_migrations::FileBasedMigrations::from_path(migrations_dir)?;
|
||||
println!("Running migrations in {}", migrations.path().display());
|
||||
HarnessWithOutput::write_to_stdout(&mut conn)
|
||||
.run_pending_migrations(migrations)
|
||||
.map(|_| ())
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
println!("Migrations complete");
|
||||
|
||||
Ok(database_url)
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
diesel = { version = "2", features = ["postgres", "r2d2", "serde_json"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
@@ -90,7 +89,6 @@ anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
diesel_derives = { version = "2", features = ["32-column-tables", "postgres", "r2d2", "with-deprecated"] }
|
||||
either = { version = "1" }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
|
||||
|
||||
Reference in New Issue
Block a user