control_plane: database persistence for attachment_service (#6468)

## Problem

Spun off from https://github.com/neondatabase/neon/pull/6394 -- this PR
is just the persistence parts and the changes that enable it to work
nicely


## Summary of changes

- Revert #6444 and #6450
- In neon_local, start a vanilla postgres instance for the attachment
service to use.
- Adopt `diesel` crate for database access in attachment service. This
uses raw SQL migrations as the source of truth for the schema, so it's a
soft dependency: we can switch libraries pretty easily.
- Rewrite persistence.rs to use postgres (via diesel) instead of JSON.
- Preserve JSON read+write at startup and shutdown: this enables using
the JSON format in compatibility tests, so that we don't have to commit
to our DB schema yet.
- In neon_local, run database creation + migrations before starting
attachment service
- Run the initial reconciliation in Service::spawn in the background, so
that the pageserver + attachment service don't get stuck waiting for
each other to start, when restarting both together in a test.
This commit is contained in:
John Spray
2024-01-26 17:20:44 +00:00
committed by GitHub
parent dcc7610ad6
commit 58f6cb649e
28 changed files with 1168 additions and 471 deletions

View File

@@ -1,5 +1,11 @@
use crate::{background_process, local_env::LocalEnv};
use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};
use diesel::{
backend::Backend,
query_builder::{AstPass, QueryFragment, QueryId},
Connection, PgConnection, QueryResult, RunQueryDsl,
};
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
use hyper::Method;
use pageserver_api::{
models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo},
@@ -7,9 +13,9 @@ use pageserver_api::{
};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{path::PathBuf, str::FromStr};
use std::{env, str::FromStr};
use tokio::process::Command;
use tracing::instrument;
use utils::{
auth::{Claims, Scope},
@@ -19,14 +25,17 @@ use utils::{
pub struct AttachmentService {
env: LocalEnv,
listen: String,
path: PathBuf,
path: Utf8PathBuf,
jwt_token: Option<String>,
public_key_path: Option<Utf8PathBuf>,
postgres_port: u16,
client: reqwest::Client,
}
const COMMAND: &str = "attachment_service";
const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
@@ -169,7 +178,9 @@ pub struct TenantShardMigrateResponse {}
impl AttachmentService {
pub fn from_env(env: &LocalEnv) -> Self {
let path = env.base_data_dir.join("attachments.json");
let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
.unwrap()
.join("attachments.json");
// Makes no sense to construct this if pageservers aren't going to use it: assume
// pageservers have control plane API set
@@ -181,6 +192,13 @@ impl AttachmentService {
listen_url.port().unwrap()
);
// Convention: NeonEnv in python tests reserves the next port after the control_plane_api
// port, for use by our captive postgres.
let postgres_port = listen_url
.port()
.expect("Control plane API setting should always have a port")
+ 1;
// Assume all pageservers have symmetric auth configuration: this service
// expects to use one JWT token to talk to all of them.
let ps_conf = env
@@ -209,6 +227,7 @@ impl AttachmentService {
listen,
jwt_token,
public_key_path,
postgres_port,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
@@ -220,13 +239,214 @@ impl AttachmentService {
.expect("non-Unicode path")
}
pub async fn start(&self) -> anyhow::Result<()> {
let path_str = self.path.to_string_lossy();
/// PIDFile for the postgres instance used to store attachment service state
fn postgres_pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(
self.env
.base_data_dir
.join("attachment_service_postgres.pid"),
)
.expect("non-Unicode path")
}
let mut args = vec!["-l", &self.listen, "-p", &path_str]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
/// In order to access database migrations, we need to find the Neon source tree
async fn find_source_root(&self) -> anyhow::Result<Utf8PathBuf> {
// We assume that either prd or our binary is in the source tree. The former is usually
// true for automated test runners, the latter is usually true for developer workstations. Often
// both are true, which is fine.
let candidate_start_points = [
// Current working directory
Utf8PathBuf::from_path_buf(std::env::current_dir()?).unwrap(),
// Directory containing the binary we're running inside
Utf8PathBuf::from_path_buf(env::current_exe()?.parent().unwrap().to_owned()).unwrap(),
];
// For each candidate start point, search through ancestors looking for a neon.git source tree root
for start_point in &candidate_start_points {
// Start from the build dir: assumes we are running out of a built neon source tree
for path in start_point.ancestors() {
// A crude approximation: the root of the source tree is whatever contains a "control_plane"
// subdirectory.
let control_plane = path.join("control_plane");
if tokio::fs::try_exists(&control_plane).await? {
return Ok(path.to_owned());
}
}
}
// Fall-through
Err(anyhow::anyhow!(
"Could not find control_plane src dir, after searching ancestors of {candidate_start_points:?}"
))
}
/// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
///
/// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
/// to other versions if that one isn't found. Some automated tests create circumstances
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
for v in prefer_versions {
let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
if tokio::fs::try_exists(&path).await? {
return Ok(path);
}
}
// Fall through
anyhow::bail!(
"Postgres binaries not found in {}",
self.env.pg_distrib_dir.display()
);
}
/// Readiness check for our postgres process
async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
let bin_path = pg_bin_dir.join("pg_isready");
let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
Ok(exitcode.success())
}
/// Create our database if it doesn't exist, and run migrations.
///
/// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
/// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
/// who just want to run `cargo neon_local` without knowing about diesel.
///
/// Returns the database url
pub async fn setup_database(&self) -> anyhow::Result<String> {
let database_url = format!(
"postgresql://localhost:{}/attachment_service",
self.postgres_port
);
println!("Running attachment service database setup...");
fn change_database_of_url(database_url: &str, default_database: &str) -> (String, String) {
let base = ::url::Url::parse(database_url).unwrap();
let database = base.path_segments().unwrap().last().unwrap().to_owned();
let mut new_url = base.join(default_database).unwrap();
new_url.set_query(base.query());
(database, new_url.into())
}
#[derive(Debug, Clone)]
pub struct CreateDatabaseStatement {
db_name: String,
}
impl CreateDatabaseStatement {
pub fn new(db_name: &str) -> Self {
CreateDatabaseStatement {
db_name: db_name.to_owned(),
}
}
}
impl<DB: Backend> QueryFragment<DB> for CreateDatabaseStatement {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
out.push_sql("CREATE DATABASE ");
out.push_identifier(&self.db_name)?;
Ok(())
}
}
impl<Conn> RunQueryDsl<Conn> for CreateDatabaseStatement {}
impl QueryId for CreateDatabaseStatement {
type QueryId = ();
const HAS_STATIC_QUERY_ID: bool = false;
}
if PgConnection::establish(&database_url).is_err() {
let (database, postgres_url) = change_database_of_url(&database_url, "postgres");
println!("Creating database: {database}");
let mut conn = PgConnection::establish(&postgres_url)?;
CreateDatabaseStatement::new(&database).execute(&mut conn)?;
}
let mut conn = PgConnection::establish(&database_url)?;
let migrations_dir = self
.find_source_root()
.await?
.join("control_plane/attachment_service/migrations");
let migrations = diesel_migrations::FileBasedMigrations::from_path(migrations_dir)?;
println!("Running migrations in {}", migrations.path().display());
HarnessWithOutput::write_to_stdout(&mut conn)
.run_pending_migrations(migrations)
.map(|_| ())
.map_err(|e| anyhow::anyhow!(e))?;
println!("Migrations complete");
Ok(database_url)
}
pub async fn start(&self) -> anyhow::Result<()> {
// Start a vanilla Postgres process used by the attachment service for persistence.
let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
.unwrap()
.join("attachment_service_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_log_path = pg_data_path.join("postgres.log");
if !tokio::fs::try_exists(&pg_data_path).await? {
// Initialize empty database
let initdb_path = pg_bin_dir.join("initdb");
let mut child = Command::new(&initdb_path)
.args(["-D", pg_data_path.as_ref()])
.spawn()
.expect("Failed to spawn initdb");
let status = child.wait().await?;
if !status.success() {
anyhow::bail!("initdb failed with status {status}");
}
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}", self.postgres_port),
)
.await?;
};
println!("Starting attachment service database...");
let db_start_args = [
"-w",
"-D",
pg_data_path.as_ref(),
"-l",
pg_log_path.as_ref(),
"start",
];
background_process::start_process(
"attachment_service_db",
&self.env.base_data_dir,
pg_bin_dir.join("pg_ctl").as_std_path(),
db_start_args,
[],
background_process::InitialPidFile::Create(self.postgres_pid_file()),
|| self.pg_isready(&pg_bin_dir),
)
.await?;
// Run migrations on every startup, in case something changed.
let database_url = self.setup_database().await?;
let mut args = vec![
"-l",
&self.listen,
"-p",
self.path.as_ref(),
"--database-url",
&database_url,
]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
if let Some(jwt_token) = &self.jwt_token {
args.push(format!("--jwt-token={jwt_token}"));
}
@@ -235,7 +455,7 @@ impl AttachmentService {
args.push(format!("--public-key={public_key_path}"));
}
let result = background_process::start_process(
background_process::start_process(
COMMAND,
&self.env.base_data_dir,
&self.env.attachment_service_bin(),
@@ -252,30 +472,46 @@ impl AttachmentService {
}
},
)
.await;
.await?;
// TODO: shouldn't we bail if we fail to spawn the process?
for ps_conf in &self.env.pageservers {
let (pg_host, pg_port) =
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
self.node_register(NodeRegisterRequest {
node_id: ps_conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
Ok(())
}
pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
println!("Stopping attachment service database...");
let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_stop_args)
.spawn()?
.wait()
.await?;
if !stop_status.success() {
let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_status_args)
.spawn()?
.wait()
.await?;
// pg_ctl status returns this exit code if postgres is not running: in this case it is
// fine that stop failed. Otherwise it is an error that stop failed.
const PG_STATUS_NOT_RUNNING: i32 = 3;
if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
println!("Attachment service data base is already stopped");
return Ok(());
} else {
anyhow::bail!("Failed to stop attachment service database: {stop_status}")
}
}
result
Ok(())
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())
}
/// Simple HTTP request wrapper for calling into attachment service
async fn dispatch<RQ, RS>(
&self,
@@ -357,7 +593,7 @@ impl AttachmentService {
&self,
req: TenantCreateRequest,
) -> anyhow::Result<TenantCreateResponse> {
self.dispatch(Method::POST, "tenant".to_string(), Some(req))
self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
.await
}
@@ -414,7 +650,7 @@ impl AttachmentService {
) -> anyhow::Result<TimelineInfo> {
self.dispatch(
Method::POST,
format!("tenant/{tenant_id}/timeline"),
format!("v1/tenant/{tenant_id}/timeline"),
Some(req),
)
.await

View File

@@ -135,7 +135,7 @@ fn main() -> Result<()> {
"tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
"timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
"start" => rt.block_on(handle_start_all(sub_args, &env)),
"stop" => handle_stop_all(sub_args, &env),
"stop" => rt.block_on(handle_stop_all(sub_args, &env)),
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
"attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)),
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
@@ -1056,8 +1056,9 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
let register = subcommand_args.get_one::<bool>("register").unwrap_or(&true);
if let Err(e) = get_pageserver(env, subcommand_args)?
.start(&pageserver_config_overrides(subcommand_args))
.start(&pageserver_config_overrides(subcommand_args), *register)
.await
{
eprintln!("pageserver start failed: {e}");
@@ -1086,24 +1087,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
}
if let Err(e) = pageserver
.start(&pageserver_config_overrides(subcommand_args))
.await
{
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("migrate", subcommand_args)) => {
let pageserver = get_pageserver(env, subcommand_args)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver
.start(&pageserver_config_overrides(subcommand_args))
.start(&pageserver_config_overrides(subcommand_args), false)
.await
{
eprintln!("pageserver start failed: {e}");
@@ -1161,7 +1145,7 @@ async fn handle_attachment_service(
.map(|s| s.as_str())
== Some("immediate");
if let Err(e) = svc.stop(immediate) {
if let Err(e) = svc.stop(immediate).await {
eprintln!("stop failed: {}", e);
exit(1);
}
@@ -1257,7 +1241,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.start().await {
eprintln!("attachment_service start failed: {:#}", e);
try_stop_all(env, true);
try_stop_all(env, true).await;
exit(1);
}
}
@@ -1265,11 +1249,11 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver
.start(&pageserver_config_overrides(sub_match))
.start(&pageserver_config_overrides(sub_match), true)
.await
{
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
try_stop_all(env, true);
try_stop_all(env, true).await;
exit(1);
}
}
@@ -1278,23 +1262,23 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start(vec![]).await {
eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e);
try_stop_all(env, false);
try_stop_all(env, false).await;
exit(1);
}
}
Ok(())
}
fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let immediate =
sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
try_stop_all(env, immediate);
try_stop_all(env, immediate).await;
Ok(())
}
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
@@ -1329,7 +1313,7 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
if env.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.stop(immediate) {
if let Err(e) = attachment_service.stop(immediate).await {
eprintln!("attachment service stop failed: {e:#}");
}
}
@@ -1549,7 +1533,11 @@ fn cli() -> Command {
.subcommand(Command::new("status"))
.subcommand(Command::new("start")
.about("Start local pageserver")
.arg(pageserver_config_args.clone())
.arg(pageserver_config_args.clone()).arg(Arg::new("register")
.long("register")
.default_value("true").required(false)
.value_parser(value_parser!(bool))
.value_name("register"))
)
.subcommand(Command::new("stop")
.about("Stop local pageserver")

View File

@@ -223,7 +223,11 @@ impl LocalEnv {
}
pub fn attachment_service_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("attachment_service")
// Irrespective of configuration, attachment service binary is always
// run from the same location as neon_local. This means that for compatibility
// tests that run old pageserver/safekeeper, they still run latest attachment service.
let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned();
neon_local_bin_dir.join("attachment_service")
}
pub fn safekeeper_bin(&self) -> PathBuf {

View File

@@ -30,6 +30,7 @@ use utils::{
lsn::Lsn,
};
use crate::attachment_service::{AttachmentService, NodeRegisterRequest};
use crate::local_env::PageServerConf;
use crate::{background_process, local_env::LocalEnv};
@@ -161,8 +162,8 @@ impl PageServerNode {
.expect("non-Unicode path")
}
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
self.start_node(config_overrides, false).await
pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> {
self.start_node(config_overrides, false, register).await
}
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
@@ -207,6 +208,7 @@ impl PageServerNode {
&self,
config_overrides: &[&str],
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
@@ -244,7 +246,26 @@ impl PageServerNode {
}
},
)
.await
.await?;
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
Ok(())
}
fn pageserver_basic_args<'a>(