diff --git a/Cargo.lock b/Cargo.lock index 6e91363de8..f0bcfb762a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -278,6 +278,7 @@ dependencies = [ "camino", "clap", "control_plane", + "diesel", "futures", "git-version", "hyper", @@ -286,7 +287,6 @@ dependencies = [ "pageserver_client", "postgres_backend", "postgres_connection", - "scopeguard", "serde", "serde_json", "thiserror", @@ -1328,6 +1328,8 @@ dependencies = [ "clap", "comfy-table", "compute_api", + "diesel", + "diesel_migrations", "futures", "git-version", "hex", @@ -1638,6 +1640,52 @@ dependencies = [ "rusticata-macros", ] +[[package]] +name = "diesel" +version = "2.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c6fcf842f17f8c78ecf7c81d75c5ce84436b41ee07e03f490fbb5f5a8731d8" +dependencies = [ + "bitflags 2.4.1", + "byteorder", + "diesel_derives", + "itoa", + "pq-sys", + "serde_json", +] + +[[package]] +name = "diesel_derives" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef8337737574f55a468005a83499da720f20c65586241ffea339db9ecdfd2b44" +dependencies = [ + "diesel_table_macro_syntax", + "proc-macro2", + "quote", + "syn 2.0.32", +] + +[[package]] +name = "diesel_migrations" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6036b3f0120c5961381b570ee20a02432d7e2d27ea60de9578799cf9156914ac" +dependencies = [ + "diesel", + "migrations_internals", + "migrations_macros", +] + +[[package]] +name = "diesel_table_macro_syntax" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5" +dependencies = [ + "syn 2.0.32", +] + [[package]] name = "digest" version = "0.10.7" @@ -2787,6 +2835,27 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "migrations_internals" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f23f71580015254b020e856feac3df5878c2c7a8812297edd6c0a485ac9dada" +dependencies = [ + "serde", + "toml", +] + +[[package]] +name = "migrations_macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cce3325ac70e67bbab5bd837a31cae01f1a6db64e0e744a33cb03a543469ef08" +dependencies = [ + "migrations_internals", + "proc-macro2", + "quote", +] + [[package]] name = "mime" version = "0.3.17" @@ -3795,6 +3864,15 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pq-sys" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" +dependencies = [ + "vcpkg", +] + [[package]] name = "pq_proto" version = "0.1.0" @@ -6623,6 +6701,7 @@ dependencies = [ "clap", "clap_builder", "crossbeam-utils", + "diesel", "either", "fail", "futures-channel", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 75e5dcb7f8..09c171f1d3 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -10,6 +10,8 @@ async-trait.workspace = true camino.workspace = true clap.workspace = true comfy-table.workspace = true +diesel = { version = "2.1.4", features = ["postgres"]} +diesel_migrations = { version = "2.1.0", features = ["postgres"]} futures.workspace = true git-version.workspace = true nix.workspace = true diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 743dd806c4..6fc21810bc 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -14,7 +14,6 @@ hyper.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true -scopeguard.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true @@ -26,6 +25,8 @@ tracing.workspace = true # a parsing function when loading pageservers from neon_local LocalEnv postgres_backend.workspace = true +diesel = { version = "2.1.4", features = ["serde_json", "postgres"] } + utils = { path = "../../libs/utils/" } metrics = { path = "../../libs/metrics/" } control_plane = { path = ".." } diff --git a/control_plane/attachment_service/migrations/.keep b/control_plane/attachment_service/migrations/.keep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/down.sql b/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000000..a9f5260911 --- /dev/null +++ b/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1,6 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + +DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); +DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/up.sql b/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000000..d68895b1a7 --- /dev/null +++ b/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,36 @@ +-- This file was automatically created by Diesel to setup helper functions +-- and other internal bookkeeping. This file is safe to edit, any future +-- changes will be added to existing projects as new migrations. + + + + +-- Sets up a trigger for the given table to automatically set a column called +-- `updated_at` whenever the row is modified (unless `updated_at` was included +-- in the modified columns) +-- +-- # Example +-- +-- ```sql +-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); +-- +-- SELECT diesel_manage_updated_at('users'); +-- ``` +CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$ +BEGIN + EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s + FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$ +BEGIN + IF ( + NEW IS DISTINCT FROM OLD AND + NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at + ) THEN + NEW.updated_at := current_timestamp; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/down.sql b/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/down.sql new file mode 100644 index 0000000000..b875b91c00 --- /dev/null +++ b/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/down.sql @@ -0,0 +1 @@ +DROP TABLE tenant_shards; diff --git a/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql b/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql new file mode 100644 index 0000000000..585dbc79a0 --- /dev/null +++ b/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql @@ -0,0 +1,12 @@ +CREATE TABLE tenant_shards ( + tenant_id VARCHAR NOT NULL, + shard_number INTEGER NOT NULL, + shard_count INTEGER NOT NULL, + PRIMARY KEY(tenant_id, shard_number, shard_count), + shard_stripe_size INTEGER NOT NULL, + generation INTEGER NOT NULL, + generation_pageserver BIGINT NOT NULL, + placement_policy VARCHAR NOT NULL, + -- config is JSON encoded, opaque to the database. + config TEXT NOT NULL +); \ No newline at end of file diff --git a/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/down.sql b/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/down.sql new file mode 100644 index 0000000000..ec303bc8cf --- /dev/null +++ b/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/down.sql @@ -0,0 +1 @@ +DROP TABLE nodes; diff --git a/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/up.sql b/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/up.sql new file mode 100644 index 0000000000..9be0880fa4 --- /dev/null +++ b/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/up.sql @@ -0,0 +1,10 @@ +CREATE TABLE nodes ( + node_id BIGINT PRIMARY KEY NOT NULL, + + scheduling_policy VARCHAR NOT NULL, + + listen_http_addr VARCHAR NOT NULL, + listen_http_port INTEGER NOT NULL, + listen_pg_addr VARCHAR NOT NULL, + listen_pg_port INTEGER NOT NULL +); \ No newline at end of file diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 30f6dd66ee..81f21a8e7a 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -1,5 +1,5 @@ use crate::reconciler::ReconcileError; -use crate::service::Service; +use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT}; use hyper::{Body, Request, Response}; use hyper::{StatusCode, Uri}; use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest}; @@ -104,34 +104,34 @@ async fn handle_inspect(mut req: Request) -> Result, ApiErr json_response(StatusCode::OK, state.service.inspect(inspect_req)) } -async fn handle_tenant_create(mut req: Request) -> Result, ApiError> { +async fn handle_tenant_create( + service: Arc, + mut req: Request, +) -> Result, ApiError> { let create_req = json_request::(&mut req).await?; - let state = get_state(&req); - json_response( - StatusCode::OK, - state.service.tenant_create(create_req).await?, - ) + json_response(StatusCode::OK, service.tenant_create(create_req).await?) } -async fn handle_tenant_timeline_create(mut req: Request) -> Result, ApiError> { +async fn handle_tenant_timeline_create( + service: Arc, + mut req: Request, +) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; let create_req = json_request::(&mut req).await?; - - let state = get_state(&req); json_response( StatusCode::OK, - state - .service + service .tenant_timeline_create(tenant_id, create_req) .await?, ) } -async fn handle_tenant_locate(req: Request) -> Result, ApiError> { +async fn handle_tenant_locate( + service: Arc, + req: Request, +) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; - let state = get_state(&req); - - json_response(StatusCode::OK, state.service.tenant_locate(tenant_id)?) + json_response(StatusCode::OK, service.tenant_locate(tenant_id)?) } async fn handle_node_register(mut req: Request) -> Result, ApiError> { @@ -154,14 +154,15 @@ async fn handle_node_configure(mut req: Request) -> Result, json_response(StatusCode::OK, state.service.node_configure(config_req)?) } -async fn handle_tenant_shard_migrate(mut req: Request) -> Result, ApiError> { +async fn handle_tenant_shard_migrate( + service: Arc, + mut req: Request, +) -> Result, ApiError> { let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; let migrate_req = json_request::(&mut req).await?; - let state = get_state(&req); json_response( StatusCode::OK, - state - .service + service .tenant_shard_migrate(tenant_shard_id, migrate_req) .await?, ) @@ -178,6 +179,35 @@ impl From for ApiError { } } +/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only +/// be allowed to run if Service has finished its initial reconciliation. +async fn tenant_service_handler(request: Request, handler: H) -> R::Output +where + R: std::future::Future, ApiError>> + Send + 'static, + H: FnOnce(Arc, Request) -> R + Send + Sync + 'static, +{ + let state = get_state(&request); + let service = state.service.clone(); + + let startup_complete = service.startup_complete.clone(); + if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait()) + .await + .is_err() + { + // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate + // timeouts around its remote calls, to bound its runtime. + return Err(ApiError::Timeout( + "Timed out waiting for service readiness".into(), + )); + } + + request_span( + request, + |request| async move { handler(service, request).await }, + ) + .await +} + pub fn make_router( service: Arc, auth: Option>, @@ -205,14 +235,20 @@ pub fn make_router( .put("/node/:node_id/config", |r| { request_span(r, handle_node_configure) }) - .post("/tenant", |r| request_span(r, handle_tenant_create)) - .post("/tenant/:tenant_id/timeline", |r| { - request_span(r, handle_tenant_timeline_create) + .post("/v1/tenant", |r| { + tenant_service_handler(r, handle_tenant_create) + }) + .post("/v1/tenant/:tenant_id/timeline", |r| { + tenant_service_handler(r, handle_tenant_timeline_create) }) .get("/tenant/:tenant_id/locate", |r| { - request_span(r, handle_tenant_locate) + tenant_service_handler(r, handle_tenant_locate) }) .put("/tenant/:tenant_shard_id/migrate", |r| { - request_span(r, handle_tenant_shard_migrate) + tenant_service_handler(r, handle_tenant_shard_migrate) }) + // Path aliases for tests_forward_compatibility + // TODO: remove these in future PR + .post("/re-attach", |r| request_span(r, handle_re_attach)) + .post("/validate", |r| request_span(r, handle_validate)) } diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs index e4ca9aa304..082afb4157 100644 --- a/control_plane/attachment_service/src/lib.rs +++ b/control_plane/attachment_service/src/lib.rs @@ -7,6 +7,7 @@ mod node; pub mod persistence; mod reconciler; mod scheduler; +mod schema; pub mod service; mod tenant_state; diff --git a/control_plane/attachment_service/src/main.rs b/control_plane/attachment_service/src/main.rs index 38e51b9a9e..05a3895dfa 100644 --- a/control_plane/attachment_service/src/main.rs +++ b/control_plane/attachment_service/src/main.rs @@ -12,9 +12,9 @@ use camino::Utf8PathBuf; use clap::Parser; use metrics::launch_timestamp::LaunchTimestamp; use std::sync::Arc; +use tokio::signal::unix::SignalKind; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::logging::{self, LogFormat}; -use utils::signals::{ShutdownSignals, Signal}; use utils::{project_build_tag, project_git_version, tcp_listener}; @@ -40,6 +40,10 @@ struct Cli { /// Path to the .json file to store state (will be created if it doesn't exist) #[arg(short, long)] path: Utf8PathBuf, + + /// URL to connect to postgres, like postgresql://localhost:1234/attachment_service + #[arg(long)] + database_url: String, } #[tokio::main] @@ -66,9 +70,14 @@ async fn main() -> anyhow::Result<()> { jwt_token: args.jwt_token, }; - let persistence = Arc::new(Persistence::spawn(&args.path).await); + let json_path = if args.path.as_os_str().is_empty() { + None + } else { + Some(args.path) + }; + let persistence = Arc::new(Persistence::new(args.database_url, json_path.clone())); - let service = Service::spawn(config, persistence).await?; + let service = Service::spawn(config, persistence.clone()).await?; let http_listener = tcp_listener::bind(args.listen)?; @@ -81,20 +90,31 @@ async fn main() -> anyhow::Result<()> { let router = make_router(service, auth) .build() .map_err(|err| anyhow!(err))?; - let service = utils::http::RouterService::new(router).unwrap(); - let server = hyper::Server::from_tcp(http_listener)?.serve(service); + let router_service = utils::http::RouterService::new(router).unwrap(); + let server = hyper::Server::from_tcp(http_listener)?.serve(router_service); tracing::info!("Serving on {0}", args.listen); tokio::task::spawn(server); - ShutdownSignals::handle(|signal| match signal { - Signal::Interrupt | Signal::Terminate | Signal::Quit => { - tracing::info!("Got {}. Terminating", signal.name()); - // We're just a test helper: no graceful shutdown. - std::process::exit(0); - } - })?; + // Wait until we receive a signal + let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; + let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?; + let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?; + tokio::select! { + _ = sigint.recv() => {}, + _ = sigterm.recv() => {}, + _ = sigquit.recv() => {}, + } + tracing::info!("Terminating on signal"); - Ok(()) + if json_path.is_some() { + // Write out a JSON dump on shutdown: this is used in compat tests to avoid passing + // full postgres dumps around. + if let Err(e) = persistence.write_tenants_json().await { + tracing::error!("Failed to write JSON on shutdown: {e}") + } + } + + std::process::exit(0); } diff --git a/control_plane/attachment_service/src/node.rs b/control_plane/attachment_service/src/node.rs index efd3f8f49b..47f61702d8 100644 --- a/control_plane/attachment_service/src/node.rs +++ b/control_plane/attachment_service/src/node.rs @@ -1,6 +1,8 @@ use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; use utils::id::NodeId; +use crate::persistence::NodePersistence; + #[derive(Clone)] pub(crate) struct Node { pub(crate) id: NodeId, @@ -34,4 +36,15 @@ impl Node { NodeSchedulingPolicy::Pause => false, } } + + pub(crate) fn to_persistent(&self) -> NodePersistence { + NodePersistence { + node_id: self.id.0 as i64, + scheduling_policy: self.scheduling.into(), + listen_http_addr: self.listen_http_addr.clone(), + listen_http_port: self.listen_http_port as i32, + listen_pg_addr: self.listen_pg_addr.clone(), + listen_pg_port: self.listen_pg_port as i32, + } + } } diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index e944a2e9ed..b27bd2bf2e 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -1,182 +1,161 @@ -use std::{collections::HashMap, str::FromStr}; +use std::collections::HashMap; +use std::str::FromStr; -use camino::{Utf8Path, Utf8PathBuf}; -use control_plane::{ - attachment_service::{NodeAvailability, NodeSchedulingPolicy}, - local_env::LocalEnv, -}; -use pageserver_api::{ - models::TenantConfig, - shard::{ShardCount, ShardNumber, TenantShardId}, -}; +use camino::Utf8Path; +use camino::Utf8PathBuf; +use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; +use diesel::pg::PgConnection; +use diesel::prelude::*; +use diesel::Connection; +use pageserver_api::models::TenantConfig; +use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId}; use postgres_connection::parse_host_port; use serde::{Deserialize, Serialize}; -use tracing::info; -use utils::{ - generation::Generation, - id::{NodeId, TenantId}, -}; +use utils::generation::Generation; +use utils::id::{NodeId, TenantId}; -use crate::{node::Node, PlacementPolicy}; +use crate::node::Node; +use crate::PlacementPolicy; -/// Placeholder for storage. This will be replaced with a database client. +/// ## What do we store? +/// +/// The attachment service does not store most of its state durably. +/// +/// The essential things to store durably are: +/// - generation numbers, as these must always advance monotonically to ensure data safety. +/// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external. +/// - Node's scheduling policies, as the source of truth for these is something external. +/// +/// Other things we store durably as an implementation detail: +/// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat, +/// but it is operationally simpler to make this service the authority for which nodes +/// it talks to. +/// +/// ## Performance/efficiency +/// +/// The attachment service does not go via the database for most things: there are +/// a couple of places where we must, and where efficiency matters: +/// - Incrementing generation numbers: the Reconciler has to wait for this to complete +/// before it can attach a tenant, so this acts as a bound on how fast things like +/// failover can happen. +/// - Pageserver re-attach: we will increment many shards' generations when this happens, +/// so it is important to avoid e.g. issuing O(N) queries. +/// +/// Database calls relating to nodes have low performance requirements, as they are very rarely +/// updated, and reads of nodes are always from memory, not the database. We only require that +/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { - inner: std::sync::Mutex, -} - -struct Inner { - state: PersistentState, - write_queue_tx: tokio::sync::mpsc::UnboundedSender, + database_url: String, + + // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of + // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward + // compatible just yet. + json_path: Option, } +/// Legacy format, for use in JSON compat objects in test environment #[derive(Serialize, Deserialize)] -struct PersistentState { +struct JsonPersistence { tenants: HashMap, } -struct PendingWrite { - bytes: Vec, - done_tx: tokio::sync::oneshot::Sender<()>, +#[derive(thiserror::Error, Debug)] +pub(crate) enum DatabaseError { + #[error(transparent)] + Query(#[from] diesel::result::Error), + #[error(transparent)] + Connection(#[from] diesel::result::ConnectionError), + #[error("Logical error: {0}")] + Logical(String), } -impl PersistentState { - async fn load(path: &Utf8Path) -> anyhow::Result { - let bytes = tokio::fs::read(path).await?; - let mut decoded = serde_json::from_slice::(&bytes)?; - - for (tenant_id, tenant) in &mut decoded.tenants { - // Backward compat: an old attachments.json from before PR #6251, replace - // empty strings with proper defaults. - if tenant.tenant_id.is_empty() { - tenant.tenant_id = format!("{}", tenant_id); - tenant.config = serde_json::to_string(&TenantConfig::default())?; - tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())?; - } - } - - Ok(decoded) - } - - async fn load_or_new(path: &Utf8Path) -> Self { - match Self::load(path).await { - Ok(s) => { - tracing::info!("Loaded state file at {}", path); - s - } - Err(e) - if e.downcast_ref::() - .map(|e| e.kind() == std::io::ErrorKind::NotFound) - .unwrap_or(false) => - { - tracing::info!("Will create state file at {}", path); - Self { - tenants: HashMap::new(), - } - } - Err(e) => { - panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path) - } - } - } -} +pub(crate) type DatabaseResult = Result; impl Persistence { - pub async fn spawn(path: &Utf8Path) -> Self { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let state = PersistentState::load_or_new(path).await; - tokio::spawn(Self::writer_task(rx, path.to_owned())); + pub fn new(database_url: String, json_path: Option) -> Self { Self { - inner: std::sync::Mutex::new(Inner { - state, - write_queue_tx: tx, - }), + database_url, + json_path, } } - async fn writer_task( - mut rx: tokio::sync::mpsc::UnboundedReceiver, - path: Utf8PathBuf, - ) { - scopeguard::defer! { - info!("persistence writer task exiting"); - }; - loop { - match rx.recv().await { - Some(write) => { - tokio::task::spawn_blocking({ - let path = path.clone(); - move || { - let tmp_path = - utils::crashsafe::path_with_suffix_extension(&path, "___new"); - utils::crashsafe::overwrite(&path, &tmp_path, &write.bytes) - } - }) - .await - .expect("spawn_blocking") - .expect("write file"); - let _ = write.done_tx.send(()); // receiver may lose interest any time - } - None => { - return; - } - } - } - } - - /// Perform a modification on our [`PersistentState`]. - /// Return a future that completes once our modification has been persisted. - /// The output of the future is the return value of the `txn`` closure. - async fn mutating_transaction(&self, txn: F) -> R + /// Call the provided function in a tokio blocking thread, with a Diesel database connection. + async fn with_conn(&self, func: F) -> DatabaseResult where - F: FnOnce(&mut PersistentState) -> R, + F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, + R: Send + 'static, { - let (ret, done_rx) = { - let mut inner = self.inner.lock().unwrap(); - let ret = txn(&mut inner.state); - let (done_tx, done_rx) = tokio::sync::oneshot::channel(); - let write = PendingWrite { - bytes: serde_json::to_vec(&inner.state).expect("Serialization error"), - done_tx, - }; - inner - .write_queue_tx - .send(write) - .expect("writer task always outlives self"); - (ret, done_rx) - }; - // the write task can go away once we start .await'ing - let _: () = done_rx.await.expect("writer task dead, check logs"); - ret + let database_url = self.database_url.clone(); + tokio::task::spawn_blocking(move || -> DatabaseResult { + // TODO: connection pooling, such as via diesel::r2d2 + let mut conn = PgConnection::establish(&database_url)?; + func(&mut conn) + }) + .await + .expect("Task panic") } - /// When registering a node, persist it so that on next start we will be able to - /// iterate over known nodes to synchronize their tenant shard states with our observed state. - pub(crate) async fn insert_node(&self, _node: &Node) -> anyhow::Result<()> { - // TODO: node persitence will come with database backend - Ok(()) + /// When a node is first registered, persist it before using it for anything + pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> { + let np = node.to_persistent(); + self.with_conn(move |conn| -> DatabaseResult<()> { + diesel::insert_into(crate::schema::nodes::table) + .values(&np) + .execute(conn)?; + Ok(()) + }) + .await } - /// At startup, we populate the service's list of nodes, and use this list to call into - /// each node to do an initial reconciliation of the state of the world with our in-memory - /// observed state. - pub(crate) async fn list_nodes(&self) -> anyhow::Result> { - let env = LocalEnv::load_config()?; - // TODO: node persitence will come with database backend + /// At startup, populate the list of nodes which our shards may be placed on + pub(crate) async fn list_nodes(&self) -> DatabaseResult> { + let nodes: Vec = self + .with_conn(move |conn| -> DatabaseResult<_> { + Ok(crate::schema::nodes::table + .load::(conn)? + .into_iter() + .map(|n| Node { + id: NodeId(n.node_id as u64), + // At startup we consider a node offline until proven otherwise. + availability: NodeAvailability::Offline, + scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy) + .expect("Bad scheduling policy in DB"), + listen_http_addr: n.listen_http_addr, + listen_http_port: n.listen_http_port as u16, + listen_pg_addr: n.listen_pg_addr, + listen_pg_port: n.listen_pg_port as u16, + }) + .collect::>()) + }) + .await?; - // XXX hack: enable test_backward_compatibility to work by populating our list of + if nodes.is_empty() { + return self.list_nodes_local_env().await; + } + + tracing::info!("list_nodes: loaded {} nodes", nodes.len()); + + Ok(nodes) + } + + /// Shim for automated compatibility tests: load nodes from LocalEnv instead of database + pub(crate) async fn list_nodes_local_env(&self) -> DatabaseResult> { + // Enable test_backward_compatibility to work by populating our list of // nodes from LocalEnv when it is not present in persistent storage. Otherwise at // first startup in the compat test, we may have shards but no nodes. - let mut result = Vec::new(); + use control_plane::local_env::LocalEnv; + let env = LocalEnv::load_config().map_err(|e| DatabaseError::Logical(format!("{e}")))?; tracing::info!( - "Loaded {} pageserver nodes from LocalEnv", + "Loading {} pageserver nodes from LocalEnv", env.pageservers.len() ); + let mut nodes = Vec::new(); for ps_conf in env.pageservers { let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr) .expect("Unable to parse listen_http_addr"); - result.push(Node { + let node = Node { id: ps_conf.id, listen_pg_addr: pg_host.to_string(), listen_pg_port: pg_port.unwrap_or(5432), @@ -184,16 +163,96 @@ impl Persistence { listen_http_port: http_port.unwrap_or(80), availability: NodeAvailability::Active, scheduling: NodeSchedulingPolicy::Active, - }); + }; + + // Synchronize database with what we learn from LocalEnv + self.insert_node(&node).await?; + + nodes.push(node); } - Ok(result) + Ok(nodes) } - /// At startup, we populate our map of tenant shards from persistent storage. - pub(crate) async fn list_tenant_shards(&self) -> anyhow::Result> { - let inner = self.inner.lock().unwrap(); - Ok(inner.state.tenants.values().cloned().collect()) + /// At startup, load the high level state for shards, such as their config + policy. This will + /// be enriched at runtime with state discovered on pageservers. + pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult> { + let loaded = self + .with_conn(move |conn| -> DatabaseResult<_> { + Ok(crate::schema::tenant_shards::table.load::(conn)?) + }) + .await?; + + if loaded.is_empty() { + if let Some(path) = &self.json_path { + if tokio::fs::try_exists(path) + .await + .map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))? + { + tracing::info!("Importing from legacy JSON format at {path}"); + return self.list_tenant_shards_json(path).await; + } + } + } + Ok(loaded) + } + + /// Shim for automated compatibility tests: load tenants from a JSON file instead of database + pub(crate) async fn list_tenant_shards_json( + &self, + path: &Utf8Path, + ) -> DatabaseResult> { + let bytes = tokio::fs::read(path) + .await + .map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?; + + let mut decoded = serde_json::from_slice::(&bytes) + .map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?; + for (tenant_id, tenant) in &mut decoded.tenants { + // Backward compat: an old attachments.json from before PR #6251, replace + // empty strings with proper defaults. + if tenant.tenant_id.is_empty() { + tenant.tenant_id = tenant_id.to_string(); + tenant.config = serde_json::to_string(&TenantConfig::default()) + .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?; + tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default()) + .map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?; + } + } + + let tenants: Vec = decoded.tenants.into_values().collect(); + + // Synchronize database with what is in the JSON file + self.insert_tenant_shards(tenants.clone()).await?; + + Ok(tenants) + } + + /// For use in testing environments, where we dump out JSON on shutdown. + pub async fn write_tenants_json(&self) -> anyhow::Result<()> { + let Some(path) = &self.json_path else { + anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)"); + }; + tracing::info!("Writing state to {path}..."); + let tenants = self.list_tenant_shards().await?; + let mut tenants_map = HashMap::new(); + for tsp in tenants { + let tenant_shard_id = TenantShardId { + tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?, + shard_number: ShardNumber(tsp.shard_number as u8), + shard_count: ShardCount(tsp.shard_count as u8), + }; + + tenants_map.insert(tenant_shard_id, tsp); + } + let json = serde_json::to_string(&JsonPersistence { + tenants: tenants_map, + })?; + + tokio::fs::write(path, &json).await?; + tracing::info!("Wrote {} bytes to {path}...", json.len()); + + Ok(()) } /// Tenants must be persisted before we schedule them for the first time. This enables us @@ -201,22 +260,79 @@ impl Persistence { pub(crate) async fn insert_tenant_shards( &self, shards: Vec, - ) -> anyhow::Result<()> { - self.mutating_transaction(|locked| { - for shard in shards { - let tenant_shard_id = TenantShardId { - tenant_id: TenantId::from_str(shard.tenant_id.as_str())?, - shard_number: ShardNumber(shard.shard_number as u8), - shard_count: ShardCount(shard.shard_count as u8), - }; - - locked.tenants.insert(tenant_shard_id, shard); - } + ) -> DatabaseResult<()> { + use crate::schema::tenant_shards::dsl::*; + self.with_conn(move |conn| -> DatabaseResult<()> { + conn.transaction(|conn| -> QueryResult<()> { + for tenant in &shards { + diesel::insert_into(tenant_shards) + .values(tenant) + .execute(conn)?; + } + Ok(()) + })?; Ok(()) }) .await } + /// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for + /// the tenant from memory on this server. + #[allow(unused)] + pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> { + use crate::schema::tenant_shards::dsl::*; + self.with_conn(move |conn| -> DatabaseResult<()> { + diesel::delete(tenant_shards) + .filter(tenant_id.eq(del_tenant_id.to_string())) + .execute(conn)?; + + Ok(()) + }) + .await + } + + /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient + /// batched increment of the generations of all tenants whose generation_pageserver is equal to + /// the node that called /re-attach. + #[tracing::instrument(skip_all, fields(node_id))] + pub(crate) async fn re_attach( + &self, + node_id: NodeId, + ) -> DatabaseResult> { + use crate::schema::tenant_shards::dsl::*; + let updated = self + .with_conn(move |conn| { + let rows_updated = diesel::update(tenant_shards) + .filter(generation_pageserver.eq(node_id.0 as i64)) + .set(generation.eq(generation + 1)) + .execute(conn)?; + + tracing::info!("Incremented {} tenants' generations", rows_updated); + + // TODO: UPDATE+SELECT in one query + + let updated = tenant_shards + .filter(generation_pageserver.eq(node_id.0 as i64)) + .select(TenantShardPersistence::as_select()) + .load(conn)?; + Ok(updated) + }) + .await?; + + let mut result = HashMap::new(); + for tsp in updated { + let tenant_shard_id = TenantShardId { + tenant_id: TenantId::from_str(tsp.tenant_id.as_str()) + .map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?, + shard_number: ShardNumber(tsp.shard_number as u8), + shard_count: ShardCount(tsp.shard_count as u8), + }; + result.insert(tenant_shard_id, Generation::new(tsp.generation as u32)); + } + + Ok(result) + } + /// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically /// advancing generation number. We also store the NodeId for which the generation was issued, so that in /// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node. @@ -225,47 +341,46 @@ impl Persistence { tenant_shard_id: TenantShardId, node_id: NodeId, ) -> anyhow::Result { - self.mutating_transaction(|locked| { - let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else { - anyhow::bail!("Tried to increment generation of unknown shard"); - }; + use crate::schema::tenant_shards::dsl::*; + let updated = self + .with_conn(move |conn| { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32)) + .set(( + generation.eq(generation + 1), + generation_pageserver.eq(node_id.0 as i64), + )) + // TODO: only returning() the generation column + .returning(TenantShardPersistence::as_returning()) + .get_result(conn)?; - shard.generation += 1; - shard.generation_pageserver = Some(node_id); + Ok(updated) + }) + .await?; - let gen = Generation::new(shard.generation); - Ok(gen) - }) - .await + Ok(Generation::new(updated.generation as u32)) } pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { - self.mutating_transaction(|locked| { - let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else { - anyhow::bail!("Tried to increment generation of unknown shard"); - }; - shard.generation_pageserver = None; - shard.placement_policy = serde_json::to_string(&PlacementPolicy::Detached).unwrap(); - Ok(()) - }) - .await - } + use crate::schema::tenant_shards::dsl::*; + self.with_conn(move |conn| { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32)) + .set(( + generation_pageserver.eq(i64::MAX), + placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), + )) + .execute(conn)?; - pub(crate) async fn re_attach( - &self, - node_id: NodeId, - ) -> anyhow::Result> { - self.mutating_transaction(|locked| { - let mut result = HashMap::new(); - for (tenant_shard_id, shard) in locked.tenants.iter_mut() { - if shard.generation_pageserver == Some(node_id) { - shard.generation += 1; - result.insert(*tenant_shard_id, Generation::new(shard.generation)); - } - } - Ok(result) + Ok(updated) }) - .await + .await?; + + Ok(()) } // TODO: when we start shard splitting, we must durably mark the tenant so that @@ -285,7 +400,8 @@ impl Persistence { } /// Parts of [`crate::tenant_state::TenantState`] that are stored durably -#[derive(Serialize, Deserialize, Clone)] +#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)] +#[diesel(table_name = crate::schema::tenant_shards)] pub(crate) struct TenantShardPersistence { #[serde(default)] pub(crate) tenant_id: String, @@ -296,16 +412,28 @@ pub(crate) struct TenantShardPersistence { #[serde(default)] pub(crate) shard_stripe_size: i32, - // Currently attached pageserver - #[serde(rename = "pageserver")] - pub(crate) generation_pageserver: Option, - // Latest generation number: next time we attach, increment this // and use the incremented number when attaching - pub(crate) generation: u32, + pub(crate) generation: i32, + + // Currently attached pageserver + #[serde(rename = "pageserver")] + pub(crate) generation_pageserver: i64, #[serde(default)] pub(crate) placement_policy: String, #[serde(default)] pub(crate) config: String, } + +/// Parts of [`crate::node::Node`] that are stored durably +#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)] +#[diesel(table_name = crate::schema::nodes)] +pub(crate) struct NodePersistence { + pub(crate) node_id: i64, + pub(crate) scheduling_policy: String, + pub(crate) listen_http_addr: String, + pub(crate) listen_http_port: i32, + pub(crate) listen_pg_addr: String, + pub(crate) listen_pg_port: i32, +} diff --git a/control_plane/attachment_service/src/schema.rs b/control_plane/attachment_service/src/schema.rs new file mode 100644 index 0000000000..de80fc8f64 --- /dev/null +++ b/control_plane/attachment_service/src/schema.rs @@ -0,0 +1,27 @@ +// @generated automatically by Diesel CLI. + +diesel::table! { + nodes (node_id) { + node_id -> Int8, + scheduling_policy -> Varchar, + listen_http_addr -> Varchar, + listen_http_port -> Int4, + listen_pg_addr -> Varchar, + listen_pg_port -> Int4, + } +} + +diesel::table! { + tenant_shards (tenant_id, shard_number, shard_count) { + tenant_id -> Varchar, + shard_number -> Int4, + shard_count -> Int4, + shard_stripe_size -> Int4, + generation -> Int4, + generation_pageserver -> Int8, + placement_policy -> Varchar, + config -> Text, + } +} + +diesel::allow_tables_to_appear_in_same_query!(nodes, tenant_shards,); diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index c9ed07ae5f..ec56dc8ad4 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -11,6 +11,7 @@ use control_plane::attachment_service::{ TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse, }; +use diesel::result::DatabaseErrorKind; use hyper::StatusCode; use pageserver_api::{ control_api::{ @@ -26,6 +27,7 @@ use pageserver_api::{ }; use pageserver_client::mgmt_api; use utils::{ + completion::Barrier, generation::Generation, http::error::ApiError, id::{NodeId, TenantId}, @@ -35,7 +37,7 @@ use utils::{ use crate::{ compute_hook::ComputeHook, node::Node, - persistence::{Persistence, TenantShardPersistence}, + persistence::{DatabaseError, Persistence, TenantShardPersistence}, scheduler::Scheduler, tenant_state::{ IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError, @@ -46,6 +48,10 @@ use crate::{ const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); +/// How long [`Service::startup_reconcile`] is allowed to take before it should give +/// up on unresponsive pageservers and proceed. +pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + // Top level state available to all HTTP handlers struct ServiceState { tenants: BTreeMap, @@ -79,10 +85,27 @@ pub struct Config { pub jwt_token: Option, } +impl From for ApiError { + fn from(err: DatabaseError) -> ApiError { + match err { + DatabaseError::Query(e) => ApiError::InternalServerError(e.into()), + // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503. + DatabaseError::Connection(_e) => ApiError::ShuttingDown, + DatabaseError::Logical(reason) => { + ApiError::InternalServerError(anyhow::anyhow!(reason)) + } + } + } +} + pub struct Service { inner: Arc>, config: Config, persistence: Arc, + + /// This waits for initial reconciliation with pageservers to complete. Until this barrier + /// passes, it isn't safe to do any actions that mutate tenants. + pub(crate) startup_complete: Barrier, } impl From for ApiError { @@ -96,77 +119,32 @@ impl From for ApiError { } impl Service { - pub async fn spawn(config: Config, persistence: Arc) -> anyhow::Result> { - let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel(); - - tracing::info!("Loading nodes from database..."); - let mut nodes = persistence.list_nodes().await?; - tracing::info!("Loaded {} nodes from database.", nodes.len()); - - tracing::info!("Loading shards from database..."); - let tenant_shard_persistence = persistence.list_tenant_shards().await?; - tracing::info!( - "Loaded {} shards from database.", - tenant_shard_persistence.len() - ); - - let mut tenants = BTreeMap::new(); - - for tsp in tenant_shard_persistence { - let tenant_shard_id = TenantShardId { - tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?, - shard_number: ShardNumber(tsp.shard_number as u8), - shard_count: ShardCount(tsp.shard_count as u8), - }; - let shard_identity = if tsp.shard_count == 0 { - ShardIdentity::unsharded() - } else { - ShardIdentity::new( - ShardNumber(tsp.shard_number as u8), - ShardCount(tsp.shard_count as u8), - ShardStripeSize(tsp.shard_stripe_size as u32), - )? - }; - let new_tenant = TenantState { - tenant_shard_id, - shard: shard_identity, - sequence: Sequence::initial(), - // Note that we load generation, but don't care about generation_pageserver. We will either end up finding - // our existing attached location and it will match generation_pageserver, or we will attach somewhere new - // and update generation_pageserver in the process. - generation: Generation::new(tsp.generation), - policy: serde_json::from_str(&tsp.placement_policy).unwrap(), - intent: IntentState::new(), - observed: ObservedState::new(), - config: serde_json::from_str(&tsp.config).unwrap(), - reconciler: None, - waiter: Arc::new(SeqWait::new(Sequence::initial())), - error_waiter: Arc::new(SeqWait::new(Sequence::initial())), - last_error: Arc::default(), - }; - - tenants.insert(tenant_shard_id, new_tenant); - } + pub fn get_config(&self) -> &Config { + &self.config + } + /// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping + /// until this is done. + async fn startup_reconcile(&self) { // For all tenant shards, a vector of observed states on nodes (where None means // indeterminate, same as in [`ObservedStateLocation`]) let mut observed = HashMap::new(); + let nodes = { + let locked = self.inner.read().unwrap(); + locked.nodes.clone() + }; + // TODO: issue these requests concurrently - for node in &mut nodes { - let client = mgmt_api::Client::new(node.base_url(), config.jwt_token.as_deref()); + for node in nodes.values() { + let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); tracing::info!("Scanning shards on node {}...", node.id); match client.list_location_config().await { Err(e) => { tracing::warn!("Could not contact pageserver {} ({e})", node.id); - // TODO: be more tolerant, apply a generous 5-10 second timeout - // TODO: setting a node to Offline is a dramatic thing to do, and can - // prevent neon_local from starting up (it starts this service before - // any pageservers are running). It may make sense to give nodes - // a Pending state to accomodate this situation, and allow (but deprioritize) - // scheduling on Pending nodes. - //node.availability = NodeAvailability::Offline; + // TODO: be more tolerant, apply a generous 5-10 second timeout with retries, in case + // pageserver is being restarted at the same time as we are } Ok(listing) => { tracing::info!( @@ -174,7 +152,6 @@ impl Service { listing.tenant_shards.len(), node.id ); - node.availability = NodeAvailability::Active; for (tenant_shard_id, conf_opt) in listing.tenant_shards { observed.insert(tenant_shard_id, (node.id, conf_opt)); @@ -186,41 +163,46 @@ impl Service { let mut cleanup = Vec::new(); // Populate intent and observed states for all tenants, based on reported state on pageservers - for (tenant_shard_id, (node_id, observed_loc)) in observed { - let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else { - cleanup.push((tenant_shard_id, node_id)); - continue; - }; + let shard_count = { + let mut locked = self.inner.write().unwrap(); + for (tenant_shard_id, (node_id, observed_loc)) in observed { + let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else { + cleanup.push((tenant_shard_id, node_id)); + continue; + }; - tenant_state - .observed - .locations - .insert(node_id, ObservedStateLocation { conf: observed_loc }); - } - - // State of nodes is now frozen, transform to a HashMap. - let mut nodes: HashMap = nodes.into_iter().map(|n| (n.id, n)).collect(); - - // Populate each tenant's intent state - let mut scheduler = Scheduler::new(&tenants, &nodes); - for (tenant_shard_id, tenant_state) in tenants.iter_mut() { - tenant_state.intent_from_observed(); - if let Err(e) = tenant_state.schedule(&mut scheduler) { - // Non-fatal error: we are unable to properly schedule the tenant, perhaps because - // not enough pageservers are available. The tenant may well still be available - // to clients. - tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}"); + tenant_state + .observed + .locations + .insert(node_id, ObservedStateLocation { conf: observed_loc }); } - } + + // Populate each tenant's intent state + let mut scheduler = Scheduler::new(&locked.tenants, &nodes); + for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() { + tenant_state.intent_from_observed(); + if let Err(e) = tenant_state.schedule(&mut scheduler) { + // Non-fatal error: we are unable to properly schedule the tenant, perhaps because + // not enough pageservers are available. The tenant may well still be available + // to clients. + tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}"); + } + } + + locked.tenants.len() + }; + + // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that + // generation_pageserver in the database. // Clean up any tenants that were found on pageservers but are not known to us. for (tenant_shard_id, node_id) in cleanup { // A node reported a tenant_shard_id which is unknown to us: detach it. let node = nodes - .get_mut(&node_id) + .get(&node_id) .expect("Always exists: only known nodes are scanned"); - let client = mgmt_api::Client::new(node.base_url(), config.jwt_token.as_deref()); + let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); match client .location_config( tenant_shard_id, @@ -252,13 +234,80 @@ impl Service { } } - let shard_count = tenants.len(); + // Finally, now that the service is up and running, launch reconcile operations for any tenants + // which require it: under normal circumstances this should only include tenants that were in some + // transient state before we restarted. + let reconcile_tasks = self.reconcile_all(); + tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"); + } + + pub async fn spawn(config: Config, persistence: Arc) -> anyhow::Result> { + let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel(); + + tracing::info!("Loading nodes from database..."); + let nodes = persistence.list_nodes().await?; + let nodes: HashMap = nodes.into_iter().map(|n| (n.id, n)).collect(); + tracing::info!("Loaded {} nodes from database.", nodes.len()); + + tracing::info!("Loading shards from database..."); + let tenant_shard_persistence = persistence.list_tenant_shards().await?; + tracing::info!( + "Loaded {} shards from database.", + tenant_shard_persistence.len() + ); + + let mut tenants = BTreeMap::new(); + + for tsp in tenant_shard_persistence { + let tenant_shard_id = TenantShardId { + tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?, + shard_number: ShardNumber(tsp.shard_number as u8), + shard_count: ShardCount(tsp.shard_count as u8), + }; + let shard_identity = if tsp.shard_count == 0 { + ShardIdentity::unsharded() + } else { + ShardIdentity::new( + ShardNumber(tsp.shard_number as u8), + ShardCount(tsp.shard_count as u8), + ShardStripeSize(tsp.shard_stripe_size as u32), + )? + }; + + // We will populate intent properly later in [`Self::startup_reconcile`], initially populate + // it with what we can infer: the node for which a generation was most recently issued. + let mut intent = IntentState::new(); + if tsp.generation_pageserver != i64::MAX { + intent.attached = Some(NodeId(tsp.generation_pageserver as u64)) + } + + let new_tenant = TenantState { + tenant_shard_id, + shard: shard_identity, + sequence: Sequence::initial(), + generation: Generation::new(tsp.generation as u32), + policy: serde_json::from_str(&tsp.placement_policy).unwrap(), + intent, + observed: ObservedState::new(), + config: serde_json::from_str(&tsp.config).unwrap(), + reconciler: None, + waiter: Arc::new(SeqWait::new(Sequence::initial())), + error_waiter: Arc::new(SeqWait::new(Sequence::initial())), + last_error: Arc::default(), + }; + + tenants.insert(tenant_shard_id, new_tenant); + } + + let (startup_completion, startup_complete) = utils::completion::channel(); + let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( result_tx, nodes, tenants, ))), config, persistence, + startup_complete, }); let result_task_this = this.clone(); @@ -316,11 +365,13 @@ impl Service { } }); - // Finally, now that the service is up and running, launch reconcile operations for any tenants - // which require it: under normal circumstances this should only include tenants that were in some - // transient state before we restarted. - let reconcile_tasks = this.reconcile_all(); - tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"); + let startup_reconcile_this = this.clone(); + tokio::task::spawn(async move { + // Block the [`Service::startup_complete`] barrier until we're done + let _completion = startup_completion; + + startup_reconcile_this.startup_reconcile().await + }); Ok(this) } @@ -336,7 +387,6 @@ impl Service { let locked = self.inner.write().unwrap(); !locked.tenants.contains_key(&attach_req.tenant_shard_id) }; - if insert { let tsp = TenantShardPersistence { tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(), @@ -344,22 +394,39 @@ impl Service { shard_count: attach_req.tenant_shard_id.shard_count.0 as i32, shard_stripe_size: 0, generation: 0, - generation_pageserver: None, + generation_pageserver: i64::MAX, placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(), config: serde_json::to_string(&TenantConfig::default()).unwrap(), }; - self.persistence.insert_tenant_shards(vec![tsp]).await?; + match self.persistence.insert_tenant_shards(vec![tsp]).await { + Err(e) => match e { + DatabaseError::Query(diesel::result::Error::DatabaseError( + DatabaseErrorKind::UniqueViolation, + _, + )) => { + tracing::info!( + "Raced with another request to insert tenant {}", + attach_req.tenant_shard_id + ) + } + _ => return Err(e.into()), + }, + Ok(()) => { + tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id); - let mut locked = self.inner.write().unwrap(); - locked.tenants.insert( - attach_req.tenant_shard_id, - TenantState::new( - attach_req.tenant_shard_id, - ShardIdentity::unsharded(), - PlacementPolicy::Single, - ), - ); + let mut locked = self.inner.write().unwrap(); + locked.tenants.insert( + attach_req.tenant_shard_id, + TenantState::new( + attach_req.tenant_shard_id, + ShardIdentity::unsharded(), + PlacementPolicy::Single, + ), + ); + tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id); + } + } } let new_generation = if let Some(req_node_id) = attach_req.node_id { @@ -506,6 +573,14 @@ impl Service { id: req_tenant.id, valid, }); + } else { + // After tenant deletion, we may approve any validation. This avoids + // spurious warnings on the pageserver if it has pending LSN updates + // at the point a deletion happens. + response.tenants.push(ValidateResponseTenant { + id: req_tenant.id, + valid: true, + }); } } response @@ -561,7 +636,7 @@ impl Service { shard_count: tenant_shard_id.shard_count.0 as i32, shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32, generation: 0, - generation_pageserver: None, + generation_pageserver: i64::MAX, placement_policy: serde_json::to_string(&placement_policy).unwrap(), config: serde_json::to_string(&create_req.config).unwrap(), }) @@ -967,10 +1042,7 @@ impl Service { availability: NodeAvailability::Active, }; // TODO: idempotency if the node already exists in the database - self.persistence - .insert_node(&new_node) - .await - .map_err(ApiError::InternalServerError)?; + self.persistence.insert_node(&new_node).await?; let mut locked = self.inner.write().unwrap(); let mut new_nodes = (*locked.nodes).clone(); diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 2d43c46270..6602aa9a73 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -1,5 +1,11 @@ use crate::{background_process, local_env::LocalEnv}; -use camino::Utf8PathBuf; +use camino::{Utf8Path, Utf8PathBuf}; +use diesel::{ + backend::Backend, + query_builder::{AstPass, QueryFragment, QueryId}, + Connection, PgConnection, QueryResult, RunQueryDsl, +}; +use diesel_migrations::{HarnessWithOutput, MigrationHarness}; use hyper::Method; use pageserver_api::{ models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo}, @@ -7,9 +13,9 @@ use pageserver_api::{ }; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; -use postgres_connection::parse_host_port; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{path::PathBuf, str::FromStr}; +use std::{env, str::FromStr}; +use tokio::process::Command; use tracing::instrument; use utils::{ auth::{Claims, Scope}, @@ -19,14 +25,17 @@ use utils::{ pub struct AttachmentService { env: LocalEnv, listen: String, - path: PathBuf, + path: Utf8PathBuf, jwt_token: Option, public_key_path: Option, + postgres_port: u16, client: reqwest::Client, } const COMMAND: &str = "attachment_service"; +const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16; + #[derive(Serialize, Deserialize)] pub struct AttachHookRequest { pub tenant_shard_id: TenantShardId, @@ -169,7 +178,9 @@ pub struct TenantShardMigrateResponse {} impl AttachmentService { pub fn from_env(env: &LocalEnv) -> Self { - let path = env.base_data_dir.join("attachments.json"); + let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone()) + .unwrap() + .join("attachments.json"); // Makes no sense to construct this if pageservers aren't going to use it: assume // pageservers have control plane API set @@ -181,6 +192,13 @@ impl AttachmentService { listen_url.port().unwrap() ); + // Convention: NeonEnv in python tests reserves the next port after the control_plane_api + // port, for use by our captive postgres. + let postgres_port = listen_url + .port() + .expect("Control plane API setting should always have a port") + + 1; + // Assume all pageservers have symmetric auth configuration: this service // expects to use one JWT token to talk to all of them. let ps_conf = env @@ -209,6 +227,7 @@ impl AttachmentService { listen, jwt_token, public_key_path, + postgres_port, client: reqwest::ClientBuilder::new() .build() .expect("Failed to construct http client"), @@ -220,13 +239,214 @@ impl AttachmentService { .expect("non-Unicode path") } - pub async fn start(&self) -> anyhow::Result<()> { - let path_str = self.path.to_string_lossy(); + /// PIDFile for the postgres instance used to store attachment service state + fn postgres_pid_file(&self) -> Utf8PathBuf { + Utf8PathBuf::from_path_buf( + self.env + .base_data_dir + .join("attachment_service_postgres.pid"), + ) + .expect("non-Unicode path") + } - let mut args = vec!["-l", &self.listen, "-p", &path_str] - .into_iter() - .map(|s| s.to_string()) - .collect::>(); + /// In order to access database migrations, we need to find the Neon source tree + async fn find_source_root(&self) -> anyhow::Result { + // We assume that either prd or our binary is in the source tree. The former is usually + // true for automated test runners, the latter is usually true for developer workstations. Often + // both are true, which is fine. + let candidate_start_points = [ + // Current working directory + Utf8PathBuf::from_path_buf(std::env::current_dir()?).unwrap(), + // Directory containing the binary we're running inside + Utf8PathBuf::from_path_buf(env::current_exe()?.parent().unwrap().to_owned()).unwrap(), + ]; + + // For each candidate start point, search through ancestors looking for a neon.git source tree root + for start_point in &candidate_start_points { + // Start from the build dir: assumes we are running out of a built neon source tree + for path in start_point.ancestors() { + // A crude approximation: the root of the source tree is whatever contains a "control_plane" + // subdirectory. + let control_plane = path.join("control_plane"); + if tokio::fs::try_exists(&control_plane).await? { + return Ok(path.to_owned()); + } + } + } + + // Fall-through + Err(anyhow::anyhow!( + "Could not find control_plane src dir, after searching ancestors of {candidate_start_points:?}" + )) + } + + /// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl` + /// + /// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back + /// to other versions if that one isn't found. Some automated tests create circumstances + /// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`. + pub async fn get_pg_bin_dir(&self) -> anyhow::Result { + let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14]; + + for v in prefer_versions { + let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap(); + if tokio::fs::try_exists(&path).await? { + return Ok(path); + } + } + + // Fall through + anyhow::bail!( + "Postgres binaries not found in {}", + self.env.pg_distrib_dir.display() + ); + } + + /// Readiness check for our postgres process + async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result { + let bin_path = pg_bin_dir.join("pg_isready"); + let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)]; + let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?; + + Ok(exitcode.success()) + } + + /// Create our database if it doesn't exist, and run migrations. + /// + /// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement + /// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers + /// who just want to run `cargo neon_local` without knowing about diesel. + /// + /// Returns the database url + pub async fn setup_database(&self) -> anyhow::Result { + let database_url = format!( + "postgresql://localhost:{}/attachment_service", + self.postgres_port + ); + println!("Running attachment service database setup..."); + fn change_database_of_url(database_url: &str, default_database: &str) -> (String, String) { + let base = ::url::Url::parse(database_url).unwrap(); + let database = base.path_segments().unwrap().last().unwrap().to_owned(); + let mut new_url = base.join(default_database).unwrap(); + new_url.set_query(base.query()); + (database, new_url.into()) + } + + #[derive(Debug, Clone)] + pub struct CreateDatabaseStatement { + db_name: String, + } + + impl CreateDatabaseStatement { + pub fn new(db_name: &str) -> Self { + CreateDatabaseStatement { + db_name: db_name.to_owned(), + } + } + } + + impl QueryFragment for CreateDatabaseStatement { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> { + out.push_sql("CREATE DATABASE "); + out.push_identifier(&self.db_name)?; + Ok(()) + } + } + + impl RunQueryDsl for CreateDatabaseStatement {} + + impl QueryId for CreateDatabaseStatement { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; + } + if PgConnection::establish(&database_url).is_err() { + let (database, postgres_url) = change_database_of_url(&database_url, "postgres"); + println!("Creating database: {database}"); + let mut conn = PgConnection::establish(&postgres_url)?; + CreateDatabaseStatement::new(&database).execute(&mut conn)?; + } + let mut conn = PgConnection::establish(&database_url)?; + + let migrations_dir = self + .find_source_root() + .await? + .join("control_plane/attachment_service/migrations"); + + let migrations = diesel_migrations::FileBasedMigrations::from_path(migrations_dir)?; + println!("Running migrations in {}", migrations.path().display()); + HarnessWithOutput::write_to_stdout(&mut conn) + .run_pending_migrations(migrations) + .map(|_| ()) + .map_err(|e| anyhow::anyhow!(e))?; + + println!("Migrations complete"); + + Ok(database_url) + } + + pub async fn start(&self) -> anyhow::Result<()> { + // Start a vanilla Postgres process used by the attachment service for persistence. + let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) + .unwrap() + .join("attachment_service_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + let pg_log_path = pg_data_path.join("postgres.log"); + + if !tokio::fs::try_exists(&pg_data_path).await? { + // Initialize empty database + let initdb_path = pg_bin_dir.join("initdb"); + let mut child = Command::new(&initdb_path) + .args(["-D", pg_data_path.as_ref()]) + .spawn() + .expect("Failed to spawn initdb"); + let status = child.wait().await?; + if !status.success() { + anyhow::bail!("initdb failed with status {status}"); + } + + tokio::fs::write( + &pg_data_path.join("postgresql.conf"), + format!("port = {}", self.postgres_port), + ) + .await?; + }; + + println!("Starting attachment service database..."); + let db_start_args = [ + "-w", + "-D", + pg_data_path.as_ref(), + "-l", + pg_log_path.as_ref(), + "start", + ]; + + background_process::start_process( + "attachment_service_db", + &self.env.base_data_dir, + pg_bin_dir.join("pg_ctl").as_std_path(), + db_start_args, + [], + background_process::InitialPidFile::Create(self.postgres_pid_file()), + || self.pg_isready(&pg_bin_dir), + ) + .await?; + + // Run migrations on every startup, in case something changed. + let database_url = self.setup_database().await?; + + let mut args = vec![ + "-l", + &self.listen, + "-p", + self.path.as_ref(), + "--database-url", + &database_url, + ] + .into_iter() + .map(|s| s.to_string()) + .collect::>(); if let Some(jwt_token) = &self.jwt_token { args.push(format!("--jwt-token={jwt_token}")); } @@ -235,7 +455,7 @@ impl AttachmentService { args.push(format!("--public-key={public_key_path}")); } - let result = background_process::start_process( + background_process::start_process( COMMAND, &self.env.base_data_dir, &self.env.attachment_service_bin(), @@ -252,30 +472,46 @@ impl AttachmentService { } }, ) - .await; + .await?; - // TODO: shouldn't we bail if we fail to spawn the process? - for ps_conf in &self.env.pageservers { - let (pg_host, pg_port) = - parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); - let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr) - .expect("Unable to parse listen_http_addr"); - self.node_register(NodeRegisterRequest { - node_id: ps_conf.id, - listen_pg_addr: pg_host.to_string(), - listen_pg_port: pg_port.unwrap_or(5432), - listen_http_addr: http_host.to_string(), - listen_http_port: http_port.unwrap_or(80), - }) + Ok(()) + } + + pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> { + background_process::stop_process(immediate, COMMAND, &self.pid_file())?; + + let pg_data_path = self.env.base_data_dir.join("attachment_service_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + + println!("Stopping attachment service database..."); + let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"]; + let stop_status = Command::new(pg_bin_dir.join("pg_ctl")) + .args(pg_stop_args) + .spawn()? + .wait() .await?; + if !stop_status.success() { + let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"]; + let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl")) + .args(pg_status_args) + .spawn()? + .wait() + .await?; + + // pg_ctl status returns this exit code if postgres is not running: in this case it is + // fine that stop failed. Otherwise it is an error that stop failed. + const PG_STATUS_NOT_RUNNING: i32 = 3; + if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() { + println!("Attachment service data base is already stopped"); + return Ok(()); + } else { + anyhow::bail!("Failed to stop attachment service database: {stop_status}") + } } - result + Ok(()) } - pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { - background_process::stop_process(immediate, COMMAND, &self.pid_file()) - } /// Simple HTTP request wrapper for calling into attachment service async fn dispatch( &self, @@ -357,7 +593,7 @@ impl AttachmentService { &self, req: TenantCreateRequest, ) -> anyhow::Result { - self.dispatch(Method::POST, "tenant".to_string(), Some(req)) + self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req)) .await } @@ -414,7 +650,7 @@ impl AttachmentService { ) -> anyhow::Result { self.dispatch( Method::POST, - format!("tenant/{tenant_id}/timeline"), + format!("v1/tenant/{tenant_id}/timeline"), Some(req), ) .await diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 279c47398f..a5242e3dc7 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -135,7 +135,7 @@ fn main() -> Result<()> { "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), "start" => rt.block_on(handle_start_all(sub_args, &env)), - "stop" => handle_stop_all(sub_args, &env), + "stop" => rt.block_on(handle_stop_all(sub_args, &env)), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), "attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)), "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), @@ -1056,8 +1056,9 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result Result<()> { match sub_match.subcommand() { Some(("start", subcommand_args)) => { + let register = subcommand_args.get_one::("register").unwrap_or(&true); if let Err(e) = get_pageserver(env, subcommand_args)? - .start(&pageserver_config_overrides(subcommand_args)) + .start(&pageserver_config_overrides(subcommand_args), *register) .await { eprintln!("pageserver start failed: {e}"); @@ -1086,24 +1087,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> } if let Err(e) = pageserver - .start(&pageserver_config_overrides(subcommand_args)) - .await - { - eprintln!("pageserver start failed: {e}"); - exit(1); - } - } - - Some(("migrate", subcommand_args)) => { - let pageserver = get_pageserver(env, subcommand_args)?; - //TODO what shutdown strategy should we use here? - if let Err(e) = pageserver.stop(false) { - eprintln!("pageserver stop failed: {}", e); - exit(1); - } - - if let Err(e) = pageserver - .start(&pageserver_config_overrides(subcommand_args)) + .start(&pageserver_config_overrides(subcommand_args), false) .await { eprintln!("pageserver start failed: {e}"); @@ -1161,7 +1145,7 @@ async fn handle_attachment_service( .map(|s| s.as_str()) == Some("immediate"); - if let Err(e) = svc.stop(immediate) { + if let Err(e) = svc.stop(immediate).await { eprintln!("stop failed: {}", e); exit(1); } @@ -1257,7 +1241,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> let attachment_service = AttachmentService::from_env(env); if let Err(e) = attachment_service.start().await { eprintln!("attachment_service start failed: {:#}", e); - try_stop_all(env, true); + try_stop_all(env, true).await; exit(1); } } @@ -1265,11 +1249,11 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); if let Err(e) = pageserver - .start(&pageserver_config_overrides(sub_match)) + .start(&pageserver_config_overrides(sub_match), true) .await { eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); - try_stop_all(env, true); + try_stop_all(env, true).await; exit(1); } } @@ -1278,23 +1262,23 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> let safekeeper = SafekeeperNode::from_env(env, node); if let Err(e) = safekeeper.start(vec![]).await { eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e); - try_stop_all(env, false); + try_stop_all(env, false).await; exit(1); } } Ok(()) } -fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let immediate = sub_match.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); - try_stop_all(env, immediate); + try_stop_all(env, immediate).await; Ok(()) } -fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { +async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { // Stop all endpoints match ComputeControlPlane::load(env.clone()) { Ok(cplane) => { @@ -1329,7 +1313,7 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { if env.control_plane_api.is_some() { let attachment_service = AttachmentService::from_env(env); - if let Err(e) = attachment_service.stop(immediate) { + if let Err(e) = attachment_service.stop(immediate).await { eprintln!("attachment service stop failed: {e:#}"); } } @@ -1549,7 +1533,11 @@ fn cli() -> Command { .subcommand(Command::new("status")) .subcommand(Command::new("start") .about("Start local pageserver") - .arg(pageserver_config_args.clone()) + .arg(pageserver_config_args.clone()).arg(Arg::new("register") + .long("register") + .default_value("true").required(false) + .value_parser(value_parser!(bool)) + .value_name("register")) ) .subcommand(Command::new("stop") .about("Stop local pageserver") diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 4460fdd3a6..aefef47da7 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -223,7 +223,11 @@ impl LocalEnv { } pub fn attachment_service_bin(&self) -> PathBuf { - self.neon_distrib_dir.join("attachment_service") + // Irrespective of configuration, attachment service binary is always + // run from the same location as neon_local. This means that for compatibility + // tests that run old pageserver/safekeeper, they still run latest attachment service. + let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned(); + neon_local_bin_dir.join("attachment_service") } pub fn safekeeper_bin(&self) -> PathBuf { diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 1db21c9a37..540d1185a2 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -30,6 +30,7 @@ use utils::{ lsn::Lsn, }; +use crate::attachment_service::{AttachmentService, NodeRegisterRequest}; use crate::local_env::PageServerConf; use crate::{background_process, local_env::LocalEnv}; @@ -161,8 +162,8 @@ impl PageServerNode { .expect("non-Unicode path") } - pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> { - self.start_node(config_overrides, false).await + pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> { + self.start_node(config_overrides, false, register).await } fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> { @@ -207,6 +208,7 @@ impl PageServerNode { &self, config_overrides: &[&str], update_config: bool, + register: bool, ) -> anyhow::Result<()> { // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); @@ -244,7 +246,26 @@ impl PageServerNode { } }, ) - .await + .await?; + + if register { + let attachment_service = AttachmentService::from_env(&self.env); + let (pg_host, pg_port) = + parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); + let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr) + .expect("Unable to parse listen_http_addr"); + attachment_service + .node_register(NodeRegisterRequest { + node_id: self.conf.id, + listen_pg_addr: pg_host.to_string(), + listen_pg_port: pg_port.unwrap_or(5432), + listen_http_addr: http_host.to_string(), + listen_http_port: http_port.unwrap_or(80), + }) + .await?; + } + + Ok(()) } fn pageserver_basic_args<'a>( diff --git a/diesel.toml b/diesel.toml new file mode 100644 index 0000000000..30ed4444d7 --- /dev/null +++ b/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "control_plane/attachment_service/src/schema.rs" +custom_type_derives = ["diesel::query_builder::QueryId"] + +[migrations_directory] +dir = "control_plane/attachment_service/migrations" diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index 0c6855d17b..b089af4a02 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -1,7 +1,7 @@ use std::{ borrow::Cow, fs::{self, File}, - io::{self, Write}, + io, }; use camino::{Utf8Path, Utf8PathBuf}; @@ -112,48 +112,6 @@ pub async fn fsync_async(path: impl AsRef) -> Result<(), std::io::Erro tokio::fs::File::open(path.as_ref()).await?.sync_all().await } -/// Writes a file to the specified `final_path` in a crash safe fasion -/// -/// The file is first written to the specified tmp_path, and in a second -/// step, the tmp path is renamed to the final path. As renames are -/// atomic, a crash during the write operation will never leave behind a -/// partially written file. -/// -/// NB: an async variant of this code exists in Pageserver's VirtualFile. -pub fn overwrite( - final_path: &Utf8Path, - tmp_path: &Utf8Path, - content: &[u8], -) -> std::io::Result<()> { - let Some(final_path_parent) = final_path.parent() else { - return Err(std::io::Error::from_raw_os_error( - nix::errno::Errno::EINVAL as i32, - )); - }; - std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?; - let mut file = std::fs::OpenOptions::new() - .write(true) - // Use `create_new` so that, if we race with ourselves or something else, - // we bail out instead of causing damage. - .create_new(true) - .open(tmp_path)?; - file.write_all(content)?; - file.sync_all()?; - drop(file); // before the rename, that's important! - // renames are atomic - std::fs::rename(tmp_path, final_path)?; - // Only open final path parent dirfd now, so that this operation only - // ever holds one VirtualFile fd at a time. That's important because - // the current `find_victim_slot` impl might pick the same slot for both - // VirtualFile., and it eventually does a blocking write lock instead of - // try_lock. - let final_parent_dirfd = std::fs::OpenOptions::new() - .read(true) - .open(final_path_parent)?; - final_parent_dirfd.sync_all()?; - Ok(()) -} - #[cfg(test)] mod tests { diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index d200a4ba5e..066f06c88f 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -403,7 +403,12 @@ impl VirtualFile { Ok(vfile) } - /// Async & [`VirtualFile`]-enabled version of [`::utils::crashsafe::overwrite`]. + /// Writes a file to the specified `final_path` in a crash safe fasion + /// + /// The file is first written to the specified tmp_path, and in a second + /// step, the tmp path is renamed to the final path. As renames are + /// atomic, a crash during the write operation will never leave behind a + /// partially written file. pub async fn crashsafe_overwrite( final_path: &Utf8Path, tmp_path: &Utf8Path, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 142c97d5c3..bbabfeedf6 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2,6 +2,7 @@ from __future__ import annotations import abc import asyncio +import concurrent.futures import filecmp import json import os @@ -993,6 +994,11 @@ class NeonEnv: self.initial_timeline = config.initial_timeline attachment_service_port = self.port_distributor.get_port() + # Reserve the next port after attachment service for use by its postgres: this + # will assert out if the next port wasn't free. + attachment_service_pg_port = self.port_distributor.get_port() + assert attachment_service_pg_port == attachment_service_port + 1 + self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}" self.attachment_service: NeonAttachmentService = NeonAttachmentService( self, config.auth_enabled @@ -1071,16 +1077,27 @@ class NeonEnv: self.neon_cli.init(cfg, force=config.config_init_force) def start(self): - # Start up broker, pageserver and all safekeepers - self.broker.try_start() - + # Attachment service starts first, so that pageserver /re-attach calls don't + # bounce through retries on startup self.attachment_service.start() - for pageserver in self.pageservers: - pageserver.start() + # Start up broker, pageserver and all safekeepers + futs = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=2 + len(self.pageservers) + len(self.safekeepers) + ) as executor: + futs.append( + executor.submit(lambda: self.broker.try_start() or None) + ) # The `or None` is for the linter - for safekeeper in self.safekeepers: - safekeeper.start() + for pageserver in self.pageservers: + futs.append(executor.submit(lambda ps=pageserver: ps.start())) + + for safekeeper in self.safekeepers: + futs.append(executor.submit(lambda sk=safekeeper: sk.start())) + + for f in futs: + f.result() def stop(self, immediate=False, ps_assert_metric_no_errors=False): """ @@ -1652,8 +1669,10 @@ class NeonCli(AbstractNeonCli): id: int, overrides: Tuple[str, ...] = (), extra_env_vars: Optional[Dict[str, str]] = None, + register: bool = True, ) -> "subprocess.CompletedProcess[str]": - start_args = ["pageserver", "start", f"--id={id}", *overrides] + register_str = "true" if register else "false" + start_args = ["pageserver", "start", f"--id={id}", *overrides, f"--register={register_str}"] storage = self.env.pageserver_remote_storage append_pageserver_param_overrides( params_to_update=start_args, @@ -2080,6 +2099,7 @@ class NeonPageserver(PgProtocol): self, overrides: Tuple[str, ...] = (), extra_env_vars: Optional[Dict[str, str]] = None, + register: bool = True, ) -> "NeonPageserver": """ Start the page server. @@ -2089,7 +2109,7 @@ class NeonPageserver(PgProtocol): assert self.running is False self.env.neon_cli.pageserver_start( - self.id, overrides=overrides, extra_env_vars=extra_env_vars + self.id, overrides=overrides, extra_env_vars=extra_env_vars, register=register ) self.running = True return self diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 1a1425f069..d5d70951be 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -138,6 +138,7 @@ def test_create_snapshot( for sk in env.safekeepers: sk.stop() env.pageserver.stop() + env.attachment_service.stop() # Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it compatibility_snapshot_dir = ( @@ -226,11 +227,17 @@ def test_forward_compatibility( try: neon_env_builder.num_safekeepers = 3 + neon_local_binpath = neon_env_builder.neon_binpath env = neon_env_builder.from_repo_dir( compatibility_snapshot_dir / "repo", neon_binpath=compatibility_neon_bin, pg_distrib_dir=compatibility_postgres_distrib_dir, ) + + # Use current neon_local even though we're using old binaries for + # everything else: our test code is written for latest CLI args. + env.neon_local_binpath = neon_local_binpath + neon_env_builder.start() check_neon_works( diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 63f6130af5..725ed63d1c 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -499,7 +499,8 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): # and serve clients. env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP env.pageserver.start( - overrides=("--pageserver-config-override=control_plane_emergency_mode=true",) + overrides=("--pageserver-config-override=control_plane_emergency_mode=true",), + register=False, ) # The pageserver should provide service to clients diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index b72e0f3c26..9d0f9bfcee 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -29,6 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd clap = { version = "4", features = ["derive", "string"] } clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] } crossbeam-utils = { version = "0.8" } +diesel = { version = "2", features = ["postgres", "serde_json"] } either = { version = "1" } fail = { version = "0.5", default-features = false, features = ["failpoints"] } futures-channel = { version = "0.3", features = ["sink"] } @@ -108,8 +109,10 @@ regex-automata = { version = "0.4", default-features = false, features = ["dfa-o regex-syntax = { version = "0.8" } serde = { version = "1", features = ["alloc", "derive"] } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] } -syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] } +syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] } time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] } +toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } +toml_edit = { version = "0.19", features = ["serde"] } zstd = { version = "0.13" } zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] } zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }