diff --git a/libs/pageserver_api/src/control_api.rs b/libs/pageserver_api/src/control_api.rs index 976ca0ee4a..3c1b1ffc3a 100644 --- a/libs/pageserver_api/src/control_api.rs +++ b/libs/pageserver_api/src/control_api.rs @@ -3,39 +3,39 @@ use serde::{Deserialize, Serialize}; use utils::id::{NodeId, TenantId}; #[derive(Serialize, Deserialize)] -struct ReAttachRequest { - node_id: NodeId, +pub struct ReAttachRequest { + pub node_id: NodeId, } #[derive(Serialize, Deserialize)] -struct ReAttachResponseTenant { - id: TenantId, - generation: u32, +pub struct ReAttachResponseTenant { + pub id: TenantId, + pub generation: u32, } #[derive(Serialize, Deserialize)] -struct ReAttachResponse { - tenants: Vec, +pub struct ReAttachResponse { + pub tenants: Vec, } #[derive(Serialize, Deserialize)] -struct ValidateRequestTenant { - id: TenantId, - gen: u32, +pub struct ValidateRequestTenant { + pub id: TenantId, + pub gen: u32, } #[derive(Serialize, Deserialize)] -struct ValidateRequest { - tenants: Vec, +pub struct ValidateRequest { + pub tenants: Vec, } #[derive(Serialize, Deserialize)] -struct ValidateResponse { - tenants: Vec, +pub struct ValidateResponse { + pub tenants: Vec, } #[derive(Serialize, Deserialize)] -struct ValidateResponseTenant { - id: TenantId, - valid: bool, +pub struct ValidateResponseTenant { + pub id: TenantId, + pub valid: bool, } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 38c87a67a9..c38277c84c 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1,10 +1,13 @@ //! This module acts as a switchboard to access different repositories managed by this //! page server. +use hyper::StatusCode; +use pageserver_api::control_api::{ReAttachRequest, ReAttachResponse}; use std::collections::{hash_map, HashMap}; use std::ffi::OsStr; use std::path::Path; use std::sync::Arc; +use std::time::Duration; use tokio::fs; use anyhow::Context; @@ -76,6 +79,75 @@ pub async fn init_tenant_mgr( let mut tenants = HashMap::new(); + // If we are configured to use the control plane API, then it is the source of truth for what to attach + let tenant_generations = conf + .control_plane_api + .as_ref() + .map(|control_plane_api| async { + let client = reqwest::ClientBuilder::new() + .build() + .expect("Failed to construct http client"); + + // FIXME: it's awkward that join() requires the base to have a trailing slash, makes + // it easy to get a config wrong + assert!( + control_plane_api.as_str().ends_with("/"), + "control plane API needs trailing slash" + ); + + let re_attach_path = control_plane_api + .join("re-attach") + .expect("Failed to build re-attach path"); + let request = ReAttachRequest { node_id: conf.id }; + + // TODO: we should have been passed a cancellation token, and use it to end + // this loop gracefully + loop { + let response = match client + .post(re_attach_path.clone()) + .json(&request) + .send() + .await + { + Err(e) => Err(anyhow::Error::from(e)), + Ok(r) => { + if r.status() == StatusCode::OK { + r.json::() + .await + .map_err(|e| anyhow::Error::from(e)) + } else { + Err(anyhow::anyhow!("Unexpected status {}", r.status())) + } + } + }; + + match response { + Ok(res) => { + tracing::info!( + "Received re-attach response with {0} tenants", + res.tenants.len() + ); + + // TODO: do something with it + break res + .tenants + .into_iter() + .map(|t| (t.id, t.generation)) + .collect::>(); + } + Err(e) => { + tracing::error!("Error re-attaching tenants, retrying: {e:#}"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + + let tenant_generations = match tenant_generations { + Some(g) => Some(g.await), + None => None, + }; + let mut dir_entries = fs::read_dir(&tenants_dir) .await .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?; @@ -123,12 +195,53 @@ pub async fn init_tenant_mgr( continue; } + let tenant_id = match tenant_dir_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + { + Ok(id) => id, + Err(_) => { + warn!( + "Invalid tenant path (garbage in our repo directory?): {0}", + tenant_dir_path.display() + ); + continue; + } + }; + + let generation = if let Some(generations) = &tenant_generations { + // We have a generation map: treat it as the authority for whether + // this tenant is really attached. + if let Some(gen) = generations.get(&tenant_id) { + Generation::new(*gen) + } else { + info!("Detaching tenant {0}, control plane omitted it in re-attach response", tenant_id); + if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await { + error!( + "Failed to remove detached tenant directory '{}': {:?}", + tenant_dir_path.display(), + e + ); + } + continue; + } + } else { + // Legacy mode: no generation information, any tenant present + // on local disk may activate + info!( + "Starting tenant {0} in legacy mode, no generation", + tenant_dir_path.display() + ); + Generation::none() + }; + match schedule_local_tenant_processing( conf, + tenant_id, &tenant_dir_path, - // TODO: call into control plane, don't start Tenants - // purely because they are on disk - Generation::none(), + generation, resources.clone(), Some(init_order.clone()), &TENANTS, @@ -164,6 +277,7 @@ pub async fn init_tenant_mgr( pub(crate) fn schedule_local_tenant_processing( conf: &'static PageServerConf, + tenant_id: TenantId, tenant_path: &Path, generation: Generation, resources: TenantSharedResources, @@ -186,15 +300,6 @@ pub(crate) fn schedule_local_tenant_processing( "Cannot load tenant from empty directory {tenant_path:?}" ); - let tenant_id = tenant_path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}") - })?; - let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); anyhow::ensure!( !conf.tenant_ignore_mark_file_path(&tenant_id).exists(), @@ -379,7 +484,7 @@ pub async fn create_tenant( remote_storage, }; let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, + schedule_local_tenant_processing(conf, tenant_id, &tenant_directory, generation, tenant_resources, None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -545,7 +650,7 @@ pub async fn load_tenant( }; // TODO: remove the `/load` API once generation support is complete: // it becomes equivalent to attaching. - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, Generation::none(), resources, None, &TENANTS, ctx) + let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, Generation::none(), resources, None, &TENANTS, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -631,7 +736,7 @@ pub async fn attach_tenant( broker_client, remote_storage: Some(remote_storage), }; - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, generation, resources, None, &TENANTS, ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233