pageserver: call into control plane on startup

This commit is contained in:
John Spray
2023-08-25 13:17:10 +01:00
parent 5b7d3e39d6
commit bc95b8f1f5
2 changed files with 137 additions and 32 deletions

View File

@@ -1,10 +1,13 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use hyper::StatusCode;
use pageserver_api::control_api::{ReAttachRequest, ReAttachResponse};
use std::collections::{hash_map, HashMap};
use std::ffi::OsStr;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use anyhow::Context;
@@ -76,6 +79,75 @@ pub async fn init_tenant_mgr(
let mut tenants = HashMap::new();
// If we are configured to use the control plane API, then it is the source of truth for what to attach
let tenant_generations = conf
.control_plane_api
.as_ref()
.map(|control_plane_api| async {
let client = reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
// FIXME: it's awkward that join() requires the base to have a trailing slash, makes
// it easy to get a config wrong
assert!(
control_plane_api.as_str().ends_with("/"),
"control plane API needs trailing slash"
);
let re_attach_path = control_plane_api
.join("re-attach")
.expect("Failed to build re-attach path");
let request = ReAttachRequest { node_id: conf.id };
// TODO: we should have been passed a cancellation token, and use it to end
// this loop gracefully
loop {
let response = match client
.post(re_attach_path.clone())
.json(&request)
.send()
.await
{
Err(e) => Err(anyhow::Error::from(e)),
Ok(r) => {
if r.status() == StatusCode::OK {
r.json::<ReAttachResponse>()
.await
.map_err(|e| anyhow::Error::from(e))
} else {
Err(anyhow::anyhow!("Unexpected status {}", r.status()))
}
}
};
match response {
Ok(res) => {
tracing::info!(
"Received re-attach response with {0} tenants",
res.tenants.len()
);
// TODO: do something with it
break res
.tenants
.into_iter()
.map(|t| (t.id, t.generation))
.collect::<HashMap<_, _>>();
}
Err(e) => {
tracing::error!("Error re-attaching tenants, retrying: {e:#}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let tenant_generations = match tenant_generations {
Some(g) => Some(g.await),
None => None,
};
let mut dir_entries = fs::read_dir(&tenants_dir)
.await
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
@@ -123,12 +195,53 @@ pub async fn init_tenant_mgr(
continue;
}
let tenant_id = match tenant_dir_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): {0}",
tenant_dir_path.display()
);
continue;
}
};
let generation = if let Some(generations) = &tenant_generations {
// We have a generation map: treat it as the authority for whether
// this tenant is really attached.
if let Some(gen) = generations.get(&tenant_id) {
Generation::new(*gen)
} else {
info!("Detaching tenant {0}, control plane omitted it in re-attach response", tenant_id);
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path.display(),
e
);
}
continue;
}
} else {
// Legacy mode: no generation information, any tenant present
// on local disk may activate
info!(
"Starting tenant {0} in legacy mode, no generation",
tenant_dir_path.display()
);
Generation::none()
};
match schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_dir_path,
// TODO: call into control plane, don't start Tenants
// purely because they are on disk
Generation::none(),
generation,
resources.clone(),
Some(init_order.clone()),
&TENANTS,
@@ -164,6 +277,7 @@ pub async fn init_tenant_mgr(
pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_path: &Path,
generation: Generation,
resources: TenantSharedResources,
@@ -186,15 +300,6 @@ pub(crate) fn schedule_local_tenant_processing(
"Cannot load tenant from empty directory {tenant_path:?}"
);
let tenant_id = tenant_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TenantId>()
.with_context(|| {
format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}")
})?;
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
anyhow::ensure!(
!conf.tenant_ignore_mark_file_path(&tenant_id).exists(),
@@ -379,7 +484,7 @@ pub async fn create_tenant(
remote_storage,
};
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory,
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
generation, tenant_resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -545,7 +650,7 @@ pub async fn load_tenant(
};
// TODO: remove the `/load` API once generation support is complete:
// it becomes equivalent to attaching.
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, Generation::none(), resources, None, &TENANTS, ctx)
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, Generation::none(), resources, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -631,7 +736,7 @@ pub async fn attach_tenant(
broker_client,
remote_storage: Some(remote_storage),
};
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233