pageserver: generation number fetch on startup and use in /attach (#5163)

## Problem

- #5050 

Closes: https://github.com/neondatabase/neon/issues/5136

## Summary of changes

- A new configuration property `control_plane_api` controls other
functionality in this PR: if it is unset (default) then everything still
works as it does today.
- If `control_plane_api` is set, then on startup we call out to control
plane `/re-attach` endpoint to discover our attachments and their
generations. If an attachment is missing from the response we implicitly
detach the tenant.
- Calls to pageserver `/attach` API may include a `generation`
parameter. If `control_plane_api` is set, then this parameter is
mandatory.
- RemoteTimelineClient's loading of index_part.json is generation-aware,
and will try to load the index_part with the most recent generation <=
its own generation.
- The `neon_local` testing environment now includes a new binary
`attachment_service` which implements the endpoints that the pageserver
requires to operate. This is on by default if running `cargo neon` by
hand. In `test_runner/` tests, it is off by default: existing tests
continue to run with in the legacy generation-less mode.

Caveats:
- The re-attachment during startup assumes that we are only re-attaching
tenants that have previously been attached, and not totally new tenants
-- this relies on the control plane's attachment logic to keep retrying
so that we should eventually see the attach API call. That's important
because the `/re-attach` API doesn't tell us which timelines we should
attach -- we still use local disk state for that. Ref:
https://github.com/neondatabase/neon/issues/5173
- Testing: generations are only enabled for one integration test right
now (test_pageserver_restart), as a smoke test that all the machinery
basically works. Writing fuller tests that stress tenant migration will
come later, and involve extending our test fixtures to deal with
multiple pageservers.
- I'm not in love with "attachment_service" as a name for the neon_local
component, but it's not very important because we can easily rename
these test bits whenever we want.
- Limited observability when in re-attach on startup: when I add
generation validation for deletions in a later PR, I want to wrap up the
control plane API calls in some small client class that will expose
metrics for things like errors calling the control plane API, which will
act as a strong red signal that something is not right.

Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
John Spray
2023-09-06 14:44:48 +01:00
committed by GitHub
parent da60f69909
commit 61d661a6c3
30 changed files with 1264 additions and 84 deletions

3
Cargo.lock generated
View File

@@ -1057,6 +1057,8 @@ dependencies = [
"comfy-table",
"compute_api",
"git-version",
"hex",
"hyper",
"nix 0.26.2",
"once_cell",
"pageserver_api",
@@ -1072,6 +1074,7 @@ dependencies = [
"storage_broker",
"tar",
"thiserror",
"tokio",
"toml",
"tracing",
"url",

View File

@@ -12,6 +12,8 @@ git-version.workspace = true
nix.workspace = true
once_cell.workspace = true
postgres.workspace = true
hex.workspace = true
hyper.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }
serde.workspace = true
@@ -20,6 +22,7 @@ serde_with.workspace = true
tar.workspace = true
thiserror.workspace = true
toml.workspace = true
tokio.workspace = true
url.workspace = true
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.

View File

@@ -0,0 +1,106 @@
use crate::{background_process, local_env::LocalEnv};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::{path::PathBuf, process::Child};
use utils::id::{NodeId, TenantId};
pub struct AttachmentService {
env: LocalEnv,
listen: String,
path: PathBuf,
}
const COMMAND: &str = "attachment_service";
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
pub pageserver_id: Option<NodeId>,
}
#[derive(Serialize, Deserialize)]
pub struct AttachHookResponse {
pub gen: Option<u32>,
}
impl AttachmentService {
pub fn from_env(env: &LocalEnv) -> Self {
let path = env.base_data_dir.join("attachments.json");
// Makes no sense to construct this if pageservers aren't going to use it: assume
// pageservers have control plane API set
let listen_url = env.pageserver.control_plane_api.clone().unwrap();
let listen = format!(
"{}:{}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
);
Self {
env: env.clone(),
path,
listen,
}
}
fn pid_file(&self) -> PathBuf {
self.env.base_data_dir.join("attachment_service.pid")
}
pub fn start(&self) -> anyhow::Result<Child> {
let path_str = self.path.to_string_lossy();
background_process::start_process(
COMMAND,
&self.env.base_data_dir,
&self.env.attachment_service_bin(),
["-l", &self.listen, "-p", &path_str],
[],
background_process::InitialPidFile::Create(&self.pid_file()),
// TODO: a real status check
|| Ok(true),
)
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
pub fn attach_hook(
&self,
tenant_id: TenantId,
pageserver_id: NodeId,
) -> anyhow::Result<Option<u32>> {
use hyper::StatusCode;
let url = self
.env
.pageserver
.control_plane_api
.clone()
.unwrap()
.join("attach_hook")
.unwrap();
let client = reqwest::blocking::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
let request = AttachHookRequest {
tenant_id,
pageserver_id: Some(pageserver_id),
};
let response = client.post(url).json(&request).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
let response = response.json::<AttachHookResponse>()?;
Ok(response.gen)
}
}

View File

@@ -0,0 +1,273 @@
/// The attachment service mimics the aspects of the control plane API
/// that are required for a pageserver to operate.
///
/// This enables running & testing pageservers without a full-blown
/// deployment of the Neon cloud platform.
///
use anyhow::anyhow;
use clap::Parser;
use hex::FromHex;
use hyper::StatusCode;
use hyper::{Body, Request, Response};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::{collections::HashMap, sync::Arc};
use utils::logging::{self, LogFormat};
use utils::{
http::{
endpoint::{self},
error::ApiError,
json::{json_request, json_response},
RequestExt, RouterBuilder,
},
id::{NodeId, TenantId},
tcp_listener,
};
use pageserver_api::control_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
ValidateResponseTenant,
};
use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(arg_required_else_help(true))]
struct Cli {
/// Host and port to listen on, like `127.0.0.1:1234`
#[arg(short, long)]
listen: std::net::SocketAddr,
/// Path to the .json file to store state (will be created if it doesn't exist)
#[arg(short, long)]
path: PathBuf,
}
// The persistent state of each Tenant
#[derive(Serialize, Deserialize, Clone)]
struct TenantState {
// Currently attached pageserver
pageserver: Option<NodeId>,
// Latest generation number: next time we attach, increment this
// and use the incremented number when attaching
generation: u32,
}
fn to_hex_map<S, V>(input: &HashMap<TenantId, V>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
V: Clone + Serialize,
{
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone()));
transformed
.collect::<HashMap<String, V>>()
.serialize(serializer)
}
fn from_hex_map<'de, D, V>(deserializer: D) -> Result<HashMap<TenantId, V>, D::Error>
where
D: serde::de::Deserializer<'de>,
V: Deserialize<'de>,
{
let hex_map = HashMap::<String, V>::deserialize(deserializer)?;
hex_map
.into_iter()
.map(|(k, v)| {
TenantId::from_hex(k)
.map(|k| (k, v))
.map_err(serde::de::Error::custom)
})
.collect()
}
// Top level state available to all HTTP handlers
#[derive(Serialize, Deserialize)]
struct PersistentState {
#[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
tenants: HashMap<TenantId, TenantState>,
#[serde(skip)]
path: PathBuf,
}
impl PersistentState {
async fn save(&self) -> anyhow::Result<()> {
let bytes = serde_json::to_vec(self)?;
tokio::fs::write(&self.path, &bytes).await?;
Ok(())
}
async fn load(path: &Path) -> anyhow::Result<Self> {
let bytes = tokio::fs::read(path).await?;
let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
decoded.path = path.to_owned();
Ok(decoded)
}
async fn load_or_new(path: &Path) -> Self {
match Self::load(path).await {
Ok(s) => {
tracing::info!("Loaded state file at {}", path.display());
s
}
Err(e)
if e.downcast_ref::<std::io::Error>()
.map(|e| e.kind() == std::io::ErrorKind::NotFound)
.unwrap_or(false) =>
{
tracing::info!("Will create state file at {}", path.display());
Self {
tenants: HashMap::new(),
path: path.to_owned(),
}
}
Err(e) => {
panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path.display())
}
}
}
}
/// State available to HTTP request handlers
#[derive(Clone)]
struct State {
inner: Arc<tokio::sync::RwLock<PersistentState>>,
}
impl State {
fn new(persistent_state: PersistentState) -> State {
Self {
inner: Arc::new(tokio::sync::RwLock::new(persistent_state)),
}
}
}
#[inline(always)]
fn get_state(request: &Request<Body>) -> &State {
request
.data::<Arc<State>>()
.expect("unknown state type")
.as_ref()
}
/// Pageserver calls into this on startup, to learn which tenants it should attach
async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
let state = get_state(&req).inner.clone();
let mut locked = state.write().await;
let mut response = ReAttachResponse {
tenants: Vec::new(),
};
for (t, state) in &mut locked.tenants {
if state.pageserver == Some(reattach_req.node_id) {
state.generation += 1;
response.tenants.push(ReAttachResponseTenant {
id: *t,
generation: state.generation,
});
}
}
locked.save().await.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Pageserver calls into this before doing deletions, to confirm that it still
/// holds the latest generation for the tenants with deletions enqueued
async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let validate_req = json_request::<ValidateRequest>(&mut req).await?;
let locked = get_state(&req).inner.read().await;
let mut response = ValidateResponse {
tenants: Vec::new(),
};
for req_tenant in validate_req.tenants {
if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
let valid = tenant_state.generation == req_tenant.gen;
response.tenants.push(ValidateResponseTenant {
id: req_tenant.id,
valid,
});
}
}
json_response(StatusCode::OK, response)
}
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
/// (in the real control plane this is unnecessary, because the same program is managing
/// generation numbers and doing attachments).
async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
let state = get_state(&req).inner.clone();
let mut locked = state.write().await;
let tenant_state = locked
.tenants
.entry(attach_req.tenant_id)
.or_insert_with(|| TenantState {
pageserver: attach_req.pageserver_id,
generation: 0,
});
if attach_req.pageserver_id.is_some() {
tenant_state.generation += 1;
}
let generation = tenant_state.generation;
locked.save().await.map_err(ApiError::InternalServerError)?;
json_response(
StatusCode::OK,
AttachHookResponse {
gen: attach_req.pageserver_id.map(|_| generation),
},
)
}
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router()
.data(Arc::new(State::new(persistent_state)))
.post("/re-attach", handle_re_attach)
.post("/validate", handle_validate)
.post("/attach_hook", handle_attach_hook)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
)?;
let args = Cli::parse();
tracing::info!(
"Starting, state at {}, listening on {}",
args.path.to_string_lossy(),
args.listen
);
let persistent_state = PersistentState::load_or_new(&args.path).await;
let http_listener = tcp_listener::bind(args.listen)?;
let router = make_router(persistent_state)
.build()
.map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?.serve(service);
tracing::info!("Serving on {0}", args.listen);
server.await?;
Ok(())
}

View File

@@ -8,6 +8,7 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use compute_api::spec::ComputeMode;
use control_plane::attachment_service::AttachmentService;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
@@ -43,6 +44,8 @@ project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "15";
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";
fn default_conf() -> String {
format!(
r#"
@@ -56,11 +59,13 @@ listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'
control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
[[safekeepers]]
id = {DEFAULT_SAFEKEEPER_ID}
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
"#,
trust_auth = AuthType::Trust,
)
@@ -107,6 +112,7 @@ fn main() -> Result<()> {
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"attachment_service" => handle_attachment_service(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
@@ -342,13 +348,25 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
}
}
Some(("create", create_match)) => {
let initial_tenant_id = parse_tenant_id(create_match)?;
let tenant_conf: HashMap<_, _> = create_match
.get_many::<String>("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.unwrap_or_default();
let new_tenant_id = pageserver.tenant_create(initial_tenant_id, tenant_conf)?;
println!("tenant {new_tenant_id} successfully created on the pageserver");
// If tenant ID was not specified, generate one
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
let generation = if env.pageserver.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, env.pageserver.id)?
} else {
None
};
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
println!("tenant {tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
@@ -358,7 +376,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
new_tenant_id,
tenant_id,
new_timeline_id,
None,
None,
@@ -369,17 +387,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
new_tenant_id,
tenant_id,
new_timeline_id,
)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
);
if create_match.get_flag("set-default") {
println!("Setting tenant {new_tenant_id} as a default one");
env.default_tenant_id = Some(new_tenant_id);
println!("Setting tenant {tenant_id} as a default one");
env.default_tenant_id = Some(tenant_id);
}
}
Some(("set-default", set_default_match)) => {
@@ -817,6 +835,33 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(())
}
fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let svc = AttachmentService::from_env(env);
match sub_match.subcommand() {
Some(("start", _start_match)) => {
if let Err(e) = svc.start() {
eprintln!("start failed: {e}");
exit(1);
}
}
Some(("stop", stop_match)) => {
let immediate = stop_match
.get_one::<String>("stop-mode")
.map(|s| s.as_str())
== Some("immediate");
if let Err(e) = svc.stop(immediate) {
eprintln!("stop failed: {}", e);
exit(1);
}
}
Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name),
None => bail!("no attachment_service subcommand provided"),
}
Ok(())
}
fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
Ok(SafekeeperNode::from_env(env, node))
@@ -897,6 +942,16 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
broker::start_broker_process(env)?;
// Only start the attachment service if the pageserver is configured to need it
if env.pageserver.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.start() {
eprintln!("attachment_service start failed: {:#}", e);
try_stop_all(env, true);
exit(1);
}
}
let pageserver = PageServerNode::from_env(env);
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
eprintln!("pageserver {} start failed: {:#}", env.pageserver.id, e);
@@ -955,6 +1010,13 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
if let Err(e) = broker::stop_broker_process(env) {
eprintln!("neon broker stop failed: {e:#}");
}
if env.pageserver.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.stop(immediate) {
eprintln!("attachment service stop failed: {e:#}");
}
}
}
fn cli() -> Command {
@@ -1138,6 +1200,14 @@ fn cli() -> Command {
.arg(stop_mode_arg.clone()))
.subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone()))
)
.subcommand(
Command::new("attachment_service")
.arg_required_else_help(true)
.about("Manage attachment_service")
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.subcommand(Command::new("stop").about("Stop local pageserver")
.arg(stop_mode_arg.clone()))
)
.subcommand(
Command::new("safekeeper")
.arg_required_else_help(true)

View File

@@ -7,6 +7,7 @@
// local installations.
//
pub mod attachment_service;
mod background_process;
pub mod broker;
pub mod endpoint;

View File

@@ -118,6 +118,9 @@ pub struct PageServerConf {
// auth type used for the PG and HTTP ports
pub pg_auth_type: AuthType,
pub http_auth_type: AuthType,
// Control plane location
pub control_plane_api: Option<Url>,
}
impl Default for PageServerConf {
@@ -128,6 +131,7 @@ impl Default for PageServerConf {
listen_http_addr: String::new(),
pg_auth_type: AuthType::Trust,
http_auth_type: AuthType::Trust,
control_plane_api: None,
}
}
}
@@ -202,6 +206,10 @@ impl LocalEnv {
self.neon_distrib_dir.join("pageserver")
}
pub fn attachment_service_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("attachment_service")
}
pub fn safekeeper_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("safekeeper")
}

View File

@@ -126,6 +126,13 @@ impl PageServerNode {
broker_endpoint_param,
];
if let Some(control_plane_api) = &self.env.pageserver.control_plane_api {
overrides.push(format!(
"control_plane_api='{}'",
control_plane_api.as_str()
));
}
if self.env.pageserver.http_auth_type != AuthType::Trust
|| self.env.pageserver.pg_auth_type != AuthType::Trust
{
@@ -316,7 +323,8 @@ impl PageServerNode {
pub fn tenant_create(
&self,
new_tenant_id: Option<TenantId>,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let mut settings = settings.clone();
@@ -382,11 +390,9 @@ impl PageServerNode {
.context("Failed to parse 'gc_feedback' as bool")?,
};
// If tenant ID was not specified, generate one
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
let request = models::TenantCreateRequest {
new_tenant_id,
generation,
config,
};
if !settings.is_empty() {

View File

@@ -0,0 +1,52 @@
//! Types in this file are for pageserver's upward-facing API calls to the control plane,
//! required for acquiring and validating tenant generation numbers.
//!
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId};
#[derive(Serialize, Deserialize)]
pub struct ReAttachRequest {
pub node_id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub generation: u32,
}
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponse {
pub tenants: Vec<ReAttachResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateRequestTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub gen: u32,
}
#[derive(Serialize, Deserialize)]
pub struct ValidateRequest {
pub tenants: Vec<ValidateRequestTenant>,
}
#[derive(Serialize, Deserialize)]
pub struct ValidateResponse {
pub tenants: Vec<ValidateResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub valid: bool,
}

View File

@@ -1,6 +1,7 @@
use const_format::formatcp;
/// Public API types
pub mod control_api;
pub mod models;
pub mod reltag;

View File

@@ -194,10 +194,22 @@ pub struct TimelineCreateRequest {
pub struct TenantCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLoadRequest {
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
}
impl std::ops::Deref for TenantCreateRequest {
type Target = TenantConfig;
@@ -241,15 +253,6 @@ pub struct StatusResponse {
pub id: NodeId,
}
impl TenantCreateRequest {
pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
TenantCreateRequest {
new_tenant_id,
config: TenantConfig::default(),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
@@ -293,9 +296,11 @@ impl TenantConfigRequest {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct TenantAttachRequest {
pub config: TenantAttachConfig,
#[serde(default)]
pub generation: Option<u32>,
}
/// Newtype to enforce deny_unknown_fields on TenantConfig for

View File

@@ -148,21 +148,55 @@ impl RemoteStorage for LocalFs {
Some(folder) => folder.with_base(&self.storage_root),
None => self.storage_root.clone(),
};
let mut files = vec![];
let mut directory_queue = vec![full_path.clone()];
// If we were given a directory, we may use it as our starting point.
// Otherwise, we must go up to the parent directory. This is because
// S3 object list prefixes can be arbitrary strings, but when reading
// the local filesystem we need a directory to start calling read_dir on.
let mut initial_dir = full_path.clone();
match fs::metadata(full_path.clone()).await {
Ok(meta) => {
if !meta.is_dir() {
// It's not a directory: strip back to the parent
initial_dir.pop();
}
}
Err(e) if e.kind() == ErrorKind::NotFound => {
// It's not a file that exists: strip the prefix back to the parent directory
initial_dir.pop();
}
Err(e) => {
// Unexpected I/O error
anyhow::bail!(e)
}
}
// Note that PathBuf starts_with only considers full path segments, but
// object prefixes are arbitrary strings, so we need the strings for doing
// starts_with later.
let prefix = full_path.to_string_lossy();
let mut files = vec![];
let mut directory_queue = vec![initial_dir.clone()];
while let Some(cur_folder) = directory_queue.pop() {
let mut entries = fs::read_dir(cur_folder.clone()).await?;
while let Some(entry) = entries.next_entry().await? {
let file_name: PathBuf = entry.file_name().into();
let full_file_name = cur_folder.clone().join(&file_name);
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
files.push(file_remote_path.clone());
if full_file_name.is_dir() {
directory_queue.push(full_file_name);
if full_file_name
.to_str()
.map(|s| s.starts_with(prefix.as_ref()))
.unwrap_or(false)
{
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
files.push(file_remote_path.clone());
if full_file_name.is_dir() {
directory_queue.push(full_file_name);
}
}
}
}
Ok(files)
}

View File

@@ -53,6 +53,7 @@ impl Generation {
matches!(self, Self::None)
}
#[track_caller]
pub fn get_suffix(&self) -> String {
match self {
Self::Valid(v) => {
@@ -64,6 +65,30 @@ impl Generation {
}
}
}
/// `suffix` is the part after "-" in a key
///
/// Returns None if parsing was unsuccessful
pub fn parse_suffix(suffix: &str) -> Option<Generation> {
u32::from_str_radix(suffix, 16).map(Generation::new).ok()
}
#[track_caller]
pub fn previous(&self) -> Generation {
match self {
Self::Valid(n) => {
if *n == 0 {
// Since a tenant may be upgraded from a pre-generations state, interpret the "previous" generation
// to 0 as being "no generation".
Self::None
} else {
Self::Valid(n - 1)
}
}
Self::None => Self::None,
Self::Broken => panic!("Attempted to use a broken generation"),
}
}
}
impl Serialize for Generation {

View File

@@ -388,6 +388,7 @@ fn start_pageserver(
remote_storage: remote_storage.clone(),
},
order,
shutdown_pageserver.clone(),
))?;
BACKGROUND_RUNTIME.spawn({

View File

@@ -205,6 +205,8 @@ pub struct PageServerConf {
/// has it's initial logical size calculated. Not running background tasks for some seconds is
/// not terrible.
pub background_task_maximum_delay: Duration,
pub control_plane_api: Option<Url>,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -279,6 +281,8 @@ struct PageServerConfigBuilder {
ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
background_task_maximum_delay: BuilderValue<Duration>,
control_plane_api: BuilderValue<Option<Url>>,
}
impl Default for PageServerConfigBuilder {
@@ -341,6 +345,8 @@ impl Default for PageServerConfigBuilder {
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
)
.unwrap()),
control_plane_api: Set(None),
}
}
}
@@ -469,6 +475,10 @@ impl PageServerConfigBuilder {
self.background_task_maximum_delay = BuilderValue::Set(delay);
}
pub fn control_plane_api(&mut self, api: Url) {
self.control_plane_api = BuilderValue::Set(Some(api))
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
@@ -554,6 +564,9 @@ impl PageServerConfigBuilder {
background_task_maximum_delay: self
.background_task_maximum_delay
.ok_or(anyhow!("missing background_task_maximum_delay"))?,
control_plane_api: self
.control_plane_api
.ok_or(anyhow!("missing control_plane_api"))?,
})
}
}
@@ -742,6 +755,7 @@ impl PageServerConf {
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
"background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
"control_plane_api" => builder.control_plane_api(parse_toml_string(key, item)?.parse().context("failed to parse control plane URL")?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -910,6 +924,7 @@ impl PageServerConf {
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::ZERO,
control_plane_api: None,
}
}
}
@@ -1133,6 +1148,7 @@ background_task_maximum_delay = '334 s'
background_task_maximum_delay: humantime::parse_duration(
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
)?,
control_plane_api: None
},
"Correct defaults should be used when no config values are provided"
);
@@ -1188,6 +1204,7 @@ background_task_maximum_delay = '334 s'
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::from_secs(334),
control_plane_api: None
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -0,0 +1,119 @@
use std::collections::HashMap;
use hyper::StatusCode;
use pageserver_api::control_api::{ReAttachRequest, ReAttachResponse};
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::{
backoff,
generation::Generation,
id::{NodeId, TenantId},
};
use crate::config::PageServerConf;
// Backoffs when control plane requests do not succeed: compromise between reducing load
// on control plane, and retrying frequently when we are blocked on a control plane
// response to make progress.
const BACKOFF_INCREMENT: f64 = 0.1;
const BACKOFF_MAX: f64 = 10.0;
/// The Pageserver's client for using the control plane API: this is a small subset
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
pub(crate) struct ControlPlaneClient {
http_client: reqwest::Client,
base_url: Url,
node_id: NodeId,
cancel: CancellationToken,
}
impl ControlPlaneClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub(crate) fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return None,
};
if let Ok(mut segs) = url.path_segments_mut() {
// This ensures that `url` ends with a slash if it doesn't already.
// That way, we can subsequently use join() to safely attach extra path elements.
segs.pop_if_empty().push("");
}
let client = reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
Some(Self {
http_client: client,
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
})
}
async fn try_re_attach(
&self,
url: Url,
request: &ReAttachRequest,
) -> anyhow::Result<ReAttachResponse> {
match self.http_client.post(url).json(request).send().await {
Err(e) => Err(anyhow::Error::from(e)),
Ok(r) => {
if r.status() == StatusCode::OK {
r.json::<ReAttachResponse>()
.await
.map_err(anyhow::Error::from)
} else {
Err(anyhow::anyhow!("Unexpected status {}", r.status()))
}
}
}
}
/// Block until we get a successful response
pub(crate) async fn re_attach(&self) -> anyhow::Result<HashMap<TenantId, Generation>> {
let re_attach_path = self
.base_url
.join("re-attach")
.expect("Failed to build re-attach path");
let request = ReAttachRequest {
node_id: self.node_id,
};
let mut attempt = 0;
loop {
let result = self.try_re_attach(re_attach_path.clone(), &request).await;
match result {
Ok(res) => {
tracing::info!(
"Received re-attach response with {} tenants",
res.tenants.len()
);
return Ok(res
.tenants
.into_iter()
.map(|t| (t.id, Generation::new(t.generation)))
.collect::<HashMap<_, _>>());
}
Err(e) => {
tracing::error!("Error re-attaching tenants, retrying: {e:#}");
backoff::exponential_backoff(
attempt,
BACKOFF_INCREMENT,
BACKOFF_MAX,
&self.cancel,
)
.await;
if self.cancel.is_cancelled() {
return Err(anyhow::anyhow!("Shutting down"));
}
attempt += 1;
}
}
}
}
}

View File

@@ -383,7 +383,6 @@ paths:
schema:
type: string
format: hex
post:
description: |
Schedules attach operation to happen in the background for the given tenant.
@@ -1020,6 +1019,9 @@ components:
properties:
config:
$ref: '#/components/schemas/TenantConfig'
generation:
type: integer
description: Attachment generation number.
TenantConfigRequest:
allOf:
- $ref: '#/components/schemas/TenantConfig'

View File

@@ -8,7 +8,9 @@ use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::{DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest};
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest, TenantLoadRequest,
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tenant_size_model::{SizeResult, StorageModel};
@@ -32,11 +34,13 @@ use crate::tenant::mgr::{
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
use crate::tenant::timeline::Timeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use utils::{
auth::JwtAuth,
generation::Generation,
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
error::{ApiError, HttpErrorBody},
@@ -472,7 +476,7 @@ async fn tenant_attach_handler(
check_permission(&request, Some(tenant_id))?;
let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
let tenant_conf = match maybe_body {
let tenant_conf = match &maybe_body {
Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
None => TenantConfOpt::default(),
};
@@ -483,10 +487,13 @@ async fn tenant_attach_handler(
let state = get_state(&request);
let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
if let Some(remote_storage) = &state.remote_storage {
mgr::attach_tenant(
state.conf,
tenant_id,
generation,
tenant_conf,
state.broker_client.clone(),
remote_storage.clone(),
@@ -538,7 +545,7 @@ async fn tenant_detach_handler(
}
async fn tenant_load_handler(
request: Request<Body>,
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
@@ -546,10 +553,18 @@ async fn tenant_load_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
let state = get_state(&request);
// The /load request is only usable when control_plane_api is not set. Once it is set, callers
// should always use /attach instead.
let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
mgr::load_tenant(
state.conf,
tenant_id,
generation,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
@@ -851,6 +866,21 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
Ok(response)
}
/// Helper for requests that may take a generation, which is mandatory
/// when control_plane_api is set, but otherwise defaults to Generation::none()
fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
if state.conf.control_plane_api.is_some() {
req_gen
.map(Generation::new)
.ok_or(ApiError::BadRequest(anyhow!(
"generation attribute missing"
)))
} else {
// Legacy mode: all tenants operate with no generation
Ok(Generation::none())
}
}
async fn tenant_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
@@ -867,14 +897,17 @@ async fn tenant_create_handler(
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
let generation = get_request_generation(state, request_data.generation)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let new_tenant = mgr::create_tenant(
state.conf,
tenant_conf,
target_tenant_id,
generation,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,

View File

@@ -3,6 +3,7 @@ pub mod basebackup;
pub mod config;
pub mod consumption_metrics;
pub mod context;
mod control_plane_client;
pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;

View File

@@ -3381,7 +3381,7 @@ pub mod harness {
pub tenant_conf: TenantConf,
pub tenant_id: TenantId,
pub generation: Generation,
remote_storage: GenericRemoteStorage,
pub remote_storage: GenericRemoteStorage,
pub remote_fs_dir: PathBuf,
}

View File

@@ -230,6 +230,23 @@ impl TimelineMetadata {
pub fn pg_version(&self) -> u32 {
self.body.pg_version
}
// Checksums make it awkward to build a valid instance by hand. This helper
// provides a TimelineMetadata with a valid checksum in its header.
#[cfg(test)]
pub fn example() -> Self {
let instance = Self::new(
"0/16960E8".parse::<Lsn>().unwrap(),
None,
None,
Lsn::from_hex("00000000").unwrap(),
Lsn::from_hex("00000000").unwrap(),
Lsn::from_hex("00000000").unwrap(),
0,
);
let bytes = instance.to_bytes().unwrap();
Self::from_bytes(&bytes).unwrap()
}
}
impl<'de> Deserialize<'de> for TimelineMetadata {

View File

@@ -11,6 +11,7 @@ use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -18,6 +19,7 @@ use utils::crashsafe;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::control_plane_client::ControlPlaneClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::delete::DeleteTenantFlow;
@@ -94,12 +96,21 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
resources: TenantSharedResources,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
let mut tenants = HashMap::new();
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) {
Some(client.re_attach().await?)
} else {
info!("Control plane API not configured, tenant generations are disabled");
None
};
let mut dir_entries = fs::read_dir(&tenants_dir)
.await
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
@@ -149,9 +160,53 @@ pub async fn init_tenant_mgr(
continue;
}
let tenant_id = match tenant_dir_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): {}",
tenant_dir_path.display()
);
continue;
}
};
let generation = if let Some(generations) = &tenant_generations {
// We have a generation map: treat it as the authority for whether
// this tenant is really attached.
if let Some(gen) = generations.get(&tenant_id) {
*gen
} else {
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path.display(),
e
);
}
continue;
}
} else {
// Legacy mode: no generation information, any tenant present
// on local disk may activate
info!(
"Starting tenant {} in legacy mode, no generation",
tenant_dir_path.display()
);
Generation::none()
};
match schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_dir_path,
generation,
resources.clone(),
Some(init_order.clone()),
&TENANTS,
@@ -185,9 +240,12 @@ pub async fn init_tenant_mgr(
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_path: &Path,
generation: Generation,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -208,15 +266,6 @@ pub(crate) fn schedule_local_tenant_processing(
"Cannot load tenant from empty directory {tenant_path:?}"
);
let tenant_id = tenant_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TenantId>()
.with_context(|| {
format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}")
})?;
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
anyhow::ensure!(
!conf.tenant_ignore_mark_file_path(&tenant_id).exists(),
@@ -229,7 +278,7 @@ pub(crate) fn schedule_local_tenant_processing(
match Tenant::spawn_attach(
conf,
tenant_id,
Generation::none(),
generation,
resources.broker_client,
tenants,
remote_storage,
@@ -253,13 +302,7 @@ pub(crate) fn schedule_local_tenant_processing(
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
Generation::none(),
resources,
init_order,
tenants,
ctx,
conf, tenant_id, generation, resources, init_order, tenants, ctx,
)
};
Ok(tenant)
@@ -383,6 +426,7 @@ pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
@@ -400,7 +444,8 @@ pub async fn create_tenant(
remote_storage,
};
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?;
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
generation, tenant_resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -548,6 +593,7 @@ async fn detach_tenant0(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
@@ -564,7 +610,7 @@ pub async fn load_tenant(
broker_client,
remote_storage,
};
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, resources, None, &TENANTS, ctx)
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, generation, resources, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -628,6 +674,7 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
tenant_conf: TenantConfOpt,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
@@ -649,7 +696,7 @@ pub async fn attach_tenant(
broker_client,
remote_storage: Some(remote_storage),
};
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -1430,6 +1430,30 @@ pub fn remote_index_path(
.expect("Failed to construct path")
}
/// Given the key of an index, parse out the generation part of the name
pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
let file_name = match path.get_path().file_name() {
Some(f) => f,
None => {
// Unexpected: we should be seeing index_part.json paths only
tracing::warn!("Malformed index key {}", path);
return None;
}
};
let file_name_str = match file_name.to_str() {
Some(s) => s,
None => {
tracing::warn!("Malformed index key {:?}", path);
return None;
}
};
match file_name_str.split_once('-') {
Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
None => None,
}
}
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
@@ -1546,6 +1570,33 @@ mod tests {
tenant_ctx: ctx,
})
}
/// Construct a RemoteTimelineClient in an arbitrary generation
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
Arc::new(RemoteTimelineClient {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
tenant_id: self.harness.tenant_id,
timeline_id: TIMELINE_ID,
generation,
storage_impl: self.harness.remote_storage.clone(),
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&self.harness.tenant_id,
&TIMELINE_ID,
)),
})
}
/// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id
/// and timeline_id are present.
fn span(&self) -> tracing::Span {
tracing::info_span!(
"test",
tenant_id = %self.harness.tenant_id,
timeline_id = %TIMELINE_ID
)
}
}
// Test scheduling
@@ -1565,12 +1616,16 @@ mod tests {
// Schedule another deletion. Check that it's launched immediately.
// Schedule index upload. Check that it's queued
let test_setup = TestSetup::new("upload_scheduling").await.unwrap();
let span = test_setup.span();
let _guard = span.enter();
let TestSetup {
harness,
tenant: _tenant,
timeline,
tenant_ctx: _tenant_ctx,
} = TestSetup::new("upload_scheduling").await.unwrap();
} = test_setup;
let client = timeline.remote_client.as_ref().unwrap();
@@ -1818,4 +1873,106 @@ mod tests {
};
assert_eq!(actual_c, expected_c);
}
async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
// An empty IndexPart, just sufficient to ensure deserialization will succeed
let example_metadata = TimelineMetadata::example();
let example_index_part = IndexPart::new(
HashMap::new(),
example_metadata.disk_consistent_lsn(),
example_metadata,
);
let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();
let timeline_path = test_state.harness.timeline_path(&TIMELINE_ID);
let remote_timeline_dir = test_state.harness.remote_fs_dir.join(
timeline_path
.strip_prefix(&test_state.harness.conf.workdir)
.unwrap(),
);
std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
let index_path = test_state.harness.remote_fs_dir.join(
remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
);
eprintln!("Writing {}", index_path.display());
std::fs::write(&index_path, index_part_bytes).unwrap();
example_index_part
}
/// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its
/// index, the IndexPart returned is equal to `expected`
async fn assert_got_index_part(
test_state: &TestSetup,
get_generation: Generation,
expected: &IndexPart,
) {
let client = test_state.build_client(get_generation);
let download_r = client
.download_index_file()
.await
.expect("download should always succeed");
assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
match download_r {
MaybeDeletedIndexPart::IndexPart(index_part) => {
assert_eq!(&index_part, expected);
}
MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"),
}
}
#[tokio::test]
async fn index_part_download_simple() -> anyhow::Result<()> {
let test_state = TestSetup::new("index_part_download_simple").await.unwrap();
let span = test_state.span();
let _guard = span.enter();
// Simple case: we are in generation N, load the index from generation N - 1
let generation_n = 5;
let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
Ok(())
}
#[tokio::test]
async fn index_part_download_ordering() -> anyhow::Result<()> {
let test_state = TestSetup::new("index_part_download_ordering")
.await
.unwrap();
let span = test_state.span();
let _guard = span.enter();
// A generation-less IndexPart exists in the bucket, we should find it
let generation_n = 5;
let injected_none = inject_index_part(&test_state, Generation::none()).await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
// If a more recent-than-none generation exists, we should prefer to load that
let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
// If a more-recent-than-me generation exists, we should ignore it.
let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
// If a directly previous generation exists, _and_ an index exists in my own
// generation, I should prefer my own generation.
let _injected_prev =
inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
assert_got_index_part(
&test_state,
Generation::new(generation_n),
&injected_current,
)
.await;
Ok(())
}
}

View File

@@ -24,7 +24,10 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use super::index::{IndexPart, LayerFileMetadata};
use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
use super::{
parse_remote_index_path, remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
};
static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
@@ -219,13 +222,13 @@ pub async fn list_remote_timelines(
Ok(timeline_ids)
}
pub(super) async fn download_index_part(
async fn do_download_index_part(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
generation: Generation,
index_generation: Generation,
) -> Result<IndexPart, DownloadError> {
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
let index_part_bytes = download_retry(
|| async {
@@ -252,6 +255,105 @@ pub(super) async fn download_index_part(
Ok(index_part)
}
/// index_part.json objects are suffixed with a generation number, so we cannot
/// directly GET the latest index part without doing some probing.
///
/// In this function we probe for the most recent index in a generation <= our current generation.
/// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
#[tracing::instrument(skip_all, fields(generation=?my_generation))]
pub(super) async fn download_index_part(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
my_generation: Generation,
) -> Result<IndexPart, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if my_generation.is_none() {
// Operating without generations: just fetch the generation-less path
return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
}
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
// index in our generation.
//
// This is an optimization to avoid doing the listing for the general case below.
let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
match res {
Ok(index_part) => {
tracing::debug!(
"Found index_part from current generation (this is a stale attachment)"
);
return Ok(index_part);
}
Err(DownloadError::NotFound) => {}
Err(e) => return Err(e),
};
// Typical case: the previous generation of this tenant was running healthily, and had uploaded
// and index part. We may safely start from this index without doing a listing, because:
// - We checked for current generation case above
// - generations > my_generation are to be ignored
// - any other indices that exist would have an older generation than `previous_gen`, and
// we want to find the most recent index from a previous generation.
//
// This is an optimization to avoid doing the listing for the general case below.
let res =
do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await;
match res {
Ok(index_part) => {
tracing::debug!("Found index_part from previous generation");
return Ok(index_part);
}
Err(DownloadError::NotFound) => {
tracing::debug!(
"No index_part found from previous generation, falling back to listing"
);
}
Err(e) => {
return Err(e);
}
}
// General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
// objects, and select the highest one with a generation <= my_generation.
let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
let indices = backoff::retry(
|| async { storage.list_files(Some(&index_prefix)).await },
|_| false,
FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"listing index_part files",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || -> anyhow::Error {
unreachable!()
}),
)
.await
.map_err(DownloadError::Other)?;
// General case logic for which index to use: the latest index whose generation
// is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
let max_previous_generation = indices
.into_iter()
.filter_map(parse_remote_index_path)
.filter(|g| g <= &my_generation)
.max();
match max_previous_generation {
Some(g) => {
tracing::debug!("Found index_part in generation {g:?}");
do_download_index_part(storage, tenant_id, timeline_id, g).await
}
None => {
// Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found");
do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await
}
}
}
/// Helper function to handle retries for a download operation.
///
/// Remote operations can fail due to rate limits (IAM, S3), spurious network

View File

@@ -1724,11 +1724,18 @@ impl Timeline {
for (name, decision) in decided {
let decision = match decision {
Ok(UseRemote { local, remote }) => {
path.push(name.file_name());
init::cleanup_local_file_for_remote(&path, &local, &remote)?;
path.pop();
UseRemote { local, remote }
// Remote is authoritative, but we may still choose to retain
// the local file if the contents appear to match
if local.file_size() == remote.file_size() {
// Use the local file, but take the remote metadata so that we pick up
// the correct generation.
UseLocal(remote)
} else {
path.push(name.file_name());
init::cleanup_local_file_for_remote(&path, &local, &remote)?;
path.pop();
UseRemote { local, remote }
}
}
Ok(decision) => decision,
Err(FutureLayer { local }) => {

View File

@@ -147,11 +147,7 @@ pub(super) fn reconcile(
Err(FutureLayer { local })
} else {
Ok(match (local, remote) {
(Some(local), Some(remote)) if local != remote => {
assert_eq!(local.generation, remote.generation);
UseRemote { local, remote }
}
(Some(local), Some(remote)) if local != remote => UseRemote { local, remote },
(Some(x), Some(_)) => UseLocal(x),
(None, Some(x)) => Evicted(x),
(Some(x), None) => NeedsUpload(x),

View File

@@ -456,6 +456,7 @@ class NeonEnvBuilder:
self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate()
self.enable_generations = False
self.scrub_on_exit = False
self.test_output_dir = test_output_dir
@@ -740,6 +741,9 @@ class NeonEnvBuilder:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
if self.env.attachment_service is not None:
self.env.attachment_service.stop(immediate=True)
cleanup_error = None
if self.scrub_on_exit:
@@ -802,6 +806,8 @@ class NeonEnv:
the tenant id
"""
PAGESERVER_ID = 1
def __init__(self, config: NeonEnvBuilder):
self.repo_dir = config.repo_dir
self.rust_log_override = config.rust_log_override
@@ -825,6 +831,14 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
if config.enable_generations:
attachment_service_port = self.port_distributor.get_port()
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
else:
self.control_plane_api = None
self.attachment_service = None
# Create a config file corresponding to the options
toml = textwrap.dedent(
f"""
@@ -850,7 +864,7 @@ class NeonEnv:
toml += textwrap.dedent(
f"""
[pageserver]
id=1
id={self.PAGESERVER_ID}
listen_pg_addr = 'localhost:{pageserver_port.pg}'
listen_http_addr = 'localhost:{pageserver_port.http}'
pg_auth_type = '{pg_auth_type}'
@@ -858,6 +872,13 @@ class NeonEnv:
"""
)
if self.control_plane_api is not None:
toml += textwrap.dedent(
f"""
control_plane_api = '{self.control_plane_api}'
"""
)
# Create a corresponding NeonPageserver object
self.pageserver = NeonPageserver(
self, port=pageserver_port, config_override=config.pageserver_config_override
@@ -904,6 +925,9 @@ class NeonEnv:
def start(self):
# Start up broker, pageserver and all safekeepers
self.broker.try_start()
if self.attachment_service is not None:
self.attachment_service.start()
self.pageserver.start()
for safekeeper in self.safekeepers:
@@ -1331,6 +1355,16 @@ class NeonCli(AbstractNeonCli):
res.check_returncode()
return res
def attachment_service_start(self):
cmd = ["attachment_service", "start"]
return self.raw_cli(cmd)
def attachment_service_stop(self, immediate: bool):
cmd = ["attachment_service", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
return self.raw_cli(cmd)
def pageserver_start(
self,
overrides: Tuple[str, ...] = (),
@@ -1512,6 +1546,35 @@ class ComputeCtl(AbstractNeonCli):
COMMAND = "compute_ctl"
class NeonAttachmentService:
def __init__(self, env: NeonEnv):
self.env = env
self.running = False
def start(self):
assert not self.running
self.env.neon_cli.attachment_service_start()
self.running = True
return self
def stop(self, immediate: bool = False) -> "NeonAttachmentService":
if self.running:
self.env.neon_cli.attachment_service_stop(immediate)
self.running = False
return self
def __enter__(self) -> "NeonAttachmentService":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
):
self.stop(immediate=True)
class NeonPageserver(PgProtocol):
"""
An object representing a running pageserver.
@@ -1675,6 +1738,26 @@ class NeonPageserver(PgProtocol):
return None
def tenant_attach(
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
):
"""
Tenant attachment passes through here to acquire a generation number before proceeding
to call into the pageserver HTTP client.
"""
if self.env.attachment_service is not None:
response = requests.post(
f"{self.env.control_plane_api}/attach_hook",
json={"tenant_id": str(tenant_id), "pageserver_id": self.env.PAGESERVER_ID},
)
response.raise_for_status()
generation = response.json()["gen"]
else:
generation = None
client = self.env.pageserver.http_client()
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
def append_pageserver_param_overrides(
params_to_update: List[str],

View File

@@ -186,18 +186,25 @@ class PageserverHttpClient(requests.Session):
return TenantId(new_tenant_id)
def tenant_attach(
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
self,
tenant_id: TenantId,
config: None | Dict[str, Any] = None,
config_null: bool = False,
generation: Optional[int] = None,
):
if config_null:
assert config is None
body = "null"
body: Any = None
else:
# null-config is prohibited by the API
config = config or {}
body = json.dumps({"config": config})
body = {"config": config}
if generation is not None:
body.update({"generation": generation})
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
data=body,
data=json.dumps(body),
headers={"Content-Type": "application/json"},
)
self.verbose_error(res)

View File

@@ -8,7 +8,9 @@ from fixtures.remote_storage import s3_storage
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("generations", [True, False])
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool):
neon_env_builder.enable_generations = generations
neon_env_builder.enable_remote_storage(remote_storage_kind=s3_storage())
neon_env_builder.enable_scrub_on_exit()

View File

@@ -52,9 +52,9 @@ from requests import ReadTimeout
#
# The tests are done for all types of remote storage pageserver supports.
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize("generations", [True, False])
def test_remote_storage_backup_and_restore(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, generations: bool
):
# Use this test to check more realistic SK ids: some etcd key parsing bugs were related,
# and this test needs SK to write data to pageserver, so it will be visible
@@ -64,6 +64,8 @@ def test_remote_storage_backup_and_restore(
remote_storage_kind=remote_storage_kind,
)
neon_env_builder.enable_generations = generations
# Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check
# that they are present after the test.
@@ -154,7 +156,7 @@ def test_remote_storage_backup_and_restore(
# background task to load the tenant. In that background task,
# listing the remote timelines will fail because of the failpoint,
# and the tenant will be marked as Broken.
client.tenant_attach(tenant_id)
env.pageserver.tenant_attach(tenant_id)
tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
assert tenant_info["attachment_status"] == {
@@ -164,7 +166,7 @@ def test_remote_storage_backup_and_restore(
# Ensure that even though the tenant is broken, we can't attach it again.
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):
client.tenant_attach(tenant_id)
env.pageserver.tenant_attach(tenant_id)
# Restart again, this implicitly clears the failpoint.
# test_remote_failures=1 remains active, though, as it's in the pageserver config.
@@ -182,7 +184,7 @@ def test_remote_storage_backup_and_restore(
# Ensure that the pageserver remembers that the tenant was attaching, by
# trying to attach it again. It should fail.
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"):
client.tenant_attach(tenant_id)
env.pageserver.tenant_attach(tenant_id)
log.info("waiting for tenant to become active. this should be quick with on-demand download")
wait_until_tenant_active(
@@ -362,7 +364,7 @@ def test_remote_storage_upload_queue_retries(
env.pageserver.start()
client = env.pageserver.http_client()
client.tenant_attach(tenant_id)
env.pageserver.tenant_attach(tenant_id)
wait_until_tenant_active(client, tenant_id)
@@ -499,7 +501,7 @@ def test_remote_timeline_client_calls_started_metric(
env.pageserver.start()
client = env.pageserver.http_client()
client.tenant_attach(tenant_id)
env.pageserver.tenant_attach(tenant_id)
wait_until_tenant_active(client, tenant_id)