add a Rust client for pageserver mgmt api

Part of getpage@lsn benchmark epic:
https://github.com/neondatabase/neon/issues/5771

This PR moves the control plane's spread-all-over-the-place client for
the pageserver management API into a separate module within the
pageserver crate.

It also switches to the async version of reqwest, which I think is
generally the right direction, and I need an async client API in the
benchmark epic.
This commit is contained in:
Christian Schwarz
2023-11-22 14:55:55 +00:00
committed by Christian Schwarz
parent 8460654f61
commit 4c5b7cff49
11 changed files with 371 additions and 252 deletions

2
Cargo.lock generated
View File

@@ -1254,6 +1254,7 @@ dependencies = [
"hyper",
"nix 0.26.2",
"once_cell",
"pageserver",
"pageserver_api",
"postgres",
"postgres_backend",
@@ -3161,6 +3162,7 @@ dependencies = [
"enum-map",
"hex",
"postgres_ffi",
"rand 0.8.5",
"serde",
"serde_json",
"serde_with",

View File

@@ -25,8 +25,7 @@ thiserror.workspace = true
toml.workspace = true
tokio.workspace = true
url.workspace = true
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.
pageserver = { path = "../pageserver" }
pageserver_api.workspace = true
postgres_backend.workspace = true
safekeeper_api.workspace = true

View File

@@ -9,7 +9,7 @@ pub struct AttachmentService {
env: LocalEnv,
listen: String,
path: PathBuf,
client: reqwest::blocking::Client,
client: reqwest::Client,
}
const COMMAND: &str = "attachment_service";
@@ -53,7 +53,7 @@ impl AttachmentService {
env: env.clone(),
path,
listen,
client: reqwest::blocking::ClientBuilder::new()
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
}
@@ -84,7 +84,7 @@ impl AttachmentService {
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
pub fn attach_hook(
pub async fn attach_hook(
&self,
tenant_id: TenantId,
pageserver_id: NodeId,
@@ -104,16 +104,16 @@ impl AttachmentService {
node_id: Some(pageserver_id),
};
let response = self.client.post(url).json(&request).send()?;
let response = self.client.post(url).json(&request).send().await?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
let response = response.json::<AttachHookResponse>()?;
let response = response.json::<AttachHookResponse>().await?;
Ok(response.gen)
}
pub fn inspect(&self, tenant_id: TenantId) -> anyhow::Result<Option<(u32, NodeId)>> {
pub async fn inspect(&self, tenant_id: TenantId) -> anyhow::Result<Option<(u32, NodeId)>> {
use hyper::StatusCode;
let url = self
@@ -126,12 +126,12 @@ impl AttachmentService {
let request = InspectRequest { tenant_id };
let response = self.client.post(url).json(&request).send()?;
let response = self.client.post(url).json(&request).send().await?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
let response = response.json::<InspectResponse>()?;
let response = response.json::<InspectResponse>().await?;
Ok(response.attachment)
}
}

View File

@@ -120,15 +120,20 @@ fn main() -> Result<()> {
let mut env = LocalEnv::load_config().context("Error loading config")?;
let original_env = env.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let subcommand_result = match sub_name {
"tenant" => handle_tenant(sub_args, &mut env),
"timeline" => handle_timeline(sub_args, &mut env),
"tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
"timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
"attachment_service" => handle_attachment_service(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
"endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
"mappings" => handle_mappings(sub_args, &mut env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
_ => bail!("unexpected subcommand {sub_name}"),
@@ -269,12 +274,13 @@ fn print_timeline(
/// Returns a map of timeline IDs to timeline_id@lsn strings.
/// Connects to the pageserver to query this information.
fn get_timeline_infos(
async fn get_timeline_infos(
env: &local_env::LocalEnv,
tenant_id: &TenantId,
) -> Result<HashMap<TimelineId, TimelineInfo>> {
Ok(get_default_pageserver(env)
.timeline_list(tenant_id)?
.timeline_list(tenant_id)
.await?
.into_iter()
.map(|timeline_info| (timeline_info.timeline_id, timeline_info))
.collect())
@@ -373,11 +379,14 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
.collect()
}
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
async fn handle_tenant(
tenant_match: &ArgMatches,
env: &mut local_env::LocalEnv,
) -> anyhow::Result<()> {
let pageserver = get_default_pageserver(env);
match tenant_match.subcommand() {
Some(("list", _)) => {
for t in pageserver.tenant_list()? {
for t in pageserver.tenant_list().await? {
println!("{} {:?}", t.id, t.state);
}
}
@@ -394,12 +403,16 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
attachment_service
.attach_hook(tenant_id, pageserver.conf.id)
.await?
} else {
None
};
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
pageserver
.tenant_create(tenant_id, generation, tenant_conf)
.await?;
println!("tenant {tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
@@ -409,14 +422,16 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
.copied()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
None,
)?;
let timeline_info = pageserver
.timeline_create(
tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
None,
)
.await?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
@@ -450,6 +465,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
pageserver
.tenant_config(tenant_id, tenant_conf)
.await
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
println!("tenant {tenant_id} successfully configured on the pageserver");
}
@@ -458,7 +474,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
let new_pageserver = get_pageserver(env, matches)?;
let new_pageserver_id = new_pageserver.conf.id;
migrate_tenant(env, tenant_id, new_pageserver)?;
migrate_tenant(env, tenant_id, new_pageserver).await?;
println!("tenant {tenant_id} migrated to {}", new_pageserver_id);
}
@@ -468,13 +484,13 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
Ok(())
}
fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let pageserver = get_default_pageserver(env);
match timeline_match.subcommand() {
Some(("list", list_match)) => {
let tenant_id = get_tenant_id(list_match, env)?;
let timelines = pageserver.timeline_list(&tenant_id)?;
let timelines = pageserver.timeline_list(&tenant_id).await?;
print_timelines_tree(timelines, env.timeline_name_mappings())?;
}
Some(("create", create_match)) => {
@@ -490,14 +506,16 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_timeline_id_opt = parse_timeline_id(create_match)?;
let timeline_info = pageserver.timeline_create(
tenant_id,
new_timeline_id_opt,
None,
None,
Some(pg_version),
None,
)?;
let timeline_info = pageserver
.timeline_create(
tenant_id,
new_timeline_id_opt,
None,
None,
Some(pg_version),
None,
)
.await?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
@@ -578,14 +596,16 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.map(|lsn_str| Lsn::from_str(lsn_str))
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let timeline_info = pageserver.timeline_create(
tenant_id,
None,
start_lsn,
Some(ancestor_timeline_id),
None,
None,
)?;
let timeline_info = pageserver
.timeline_create(
tenant_id,
None,
start_lsn,
Some(ancestor_timeline_id),
None,
None,
)
.await?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
@@ -604,7 +624,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Ok(())
}
fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match ep_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no endpoint subcommand provided"),
@@ -614,10 +634,12 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
match sub_name {
"list" => {
let tenant_id = get_tenant_id(sub_args, env)?;
let timeline_infos = get_timeline_infos(env, &tenant_id).unwrap_or_else(|e| {
eprintln!("Failed to load timeline info: {}", e);
HashMap::new()
});
let timeline_infos = get_timeline_infos(env, &tenant_id)
.await
.unwrap_or_else(|e| {
eprintln!("Failed to load timeline info: {}", e);
HashMap::new()
});
let timeline_name_mappings = env.timeline_name_mappings();
@@ -791,7 +813,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
};
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
endpoint
.start(&auth_token, safekeepers, remote_ext_config)
.await?;
}
"reconfigure" => {
let endpoint_id = sub_args
@@ -875,7 +899,7 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
))
}
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
if let Err(e) = get_pageserver(env, subcommand_args)?
@@ -927,7 +951,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
Some(("status", subcommand_args)) => {
match get_pageserver(env, subcommand_args)?.check_status() {
match get_pageserver(env, subcommand_args)?.check_status().await {
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);

View File

@@ -464,7 +464,7 @@ impl Endpoint {
}
}
pub fn start(
pub async fn start(
&self,
auth_token: &Option<String>,
safekeepers: Vec<NodeId>,
@@ -587,7 +587,7 @@ impl Endpoint {
const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
loop {
attempt += 1;
match self.get_status() {
match self.get_status().await {
Ok(state) => {
match state.status {
ComputeStatus::Init => {
@@ -629,8 +629,8 @@ impl Endpoint {
}
// Call the /status HTTP API
pub fn get_status(&self) -> Result<ComputeState> {
let client = reqwest::blocking::Client::new();
pub async fn get_status(&self) -> Result<ComputeState> {
let client = reqwest::Client::new();
let response = client
.request(
@@ -641,16 +641,17 @@ impl Endpoint {
self.http_address.port()
),
)
.send()?;
.send()
.await?;
// Interpret the response
let status = response.status();
if !(status.is_client_error() || status.is_server_error()) {
Ok(response.json()?)
Ok(response.json().await?)
} else {
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = response.url().to_owned();
let msg = match response.text() {
let msg = match response.text().await {
Ok(err_body) => format!("Error: {}", err_body),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};

View File

@@ -7,27 +7,22 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::{BufReader, Write};
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::time::Duration;
use std::{io, result};
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::models::{
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
};
use pageserver::client::mgmt_api;
use pageserver_api::models::{self, LocationConfig, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::auth::{Claims, Scope};
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
lsn::Lsn,
};
@@ -38,45 +33,6 @@ use crate::{background_process, local_env::LocalEnv};
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver";
#[derive(Error, Debug)]
pub enum PageserverHttpError {
#[error("Reqwest error: {0}")]
Transport(#[from] reqwest::Error),
#[error("Error: {0}")]
Response(String),
}
impl From<anyhow::Error> for PageserverHttpError {
fn from(e: anyhow::Error) -> Self {
Self::Response(e.to_string())
}
}
type Result<T> = result::Result<T, PageserverHttpError>;
pub trait ResponseErrorMessageExt: Sized {
fn error_from_body(self) -> Result<Self>;
}
impl ResponseErrorMessageExt for Response {
fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = self.url().to_owned();
Err(PageserverHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
}
}
//
// Control routines for pageserver.
//
@@ -87,8 +43,7 @@ pub struct PageServerNode {
pub pg_connection_config: PgConnectionConfig,
pub conf: PageServerConf,
pub env: LocalEnv,
pub http_client: Client,
pub http_base_url: String,
pub http_client: mgmt_api::Client,
}
impl PageServerNode {
@@ -100,8 +55,19 @@ impl PageServerNode {
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
conf: conf.clone(),
env: env.clone(),
http_client: Client::new(),
http_base_url: format!("http://{}/v1", conf.listen_http_addr),
http_client: mgmt_api::Client::new(
format!("http://{}", conf.listen_http_addr),
{
match conf.http_auth_type {
AuthType::Trust => None,
AuthType::NeonJWT => Some(
env.generate_auth_token(&Claims::new(None, Scope::PageServerApi))
.unwrap(),
),
}
}
.as_deref(),
),
}
}
@@ -225,39 +191,52 @@ impl PageServerNode {
}
fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
let datadir = self.repo_path();
print!(
"Starting pageserver node {} at '{}' in {:?}",
self.conf.id,
self.pg_connection_config.raw_address(),
datadir
);
io::stdout().flush()?;
// TODO: using a thread here because start_process() is not async but we need to call check_status()
std::thread::scope(move |s| {
s.spawn(move || {
let datadir = self.repo_path();
print!(
"Starting pageserver node {} at '{}' in {:?}",
self.conf.id,
self.pg_connection_config.raw_address(),
datadir
);
io::stdout().flush().context("flush stdout")?;
let datadir_path_str = datadir.to_str().with_context(|| {
format!(
"Cannot start pageserver node {} in path that has no string representation: {:?}",
self.conf.id, datadir,
)
})?;
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
if update_config {
args.push(Cow::Borrowed("--update-config"));
}
background_process::start_process(
"pageserver",
&datadir,
&self.env.pageserver_bin(),
args.iter().map(Cow::as_ref),
self.pageserver_env_variables()?,
background_process::InitialPidFile::Expect(&self.pid_file()),
|| match self.check_status() {
Ok(()) => Ok(true),
Err(PageserverHttpError::Transport(_)) => Ok(false),
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
},
)
let datadir_path_str = datadir.to_str().with_context(|| {
format!(
"Cannot start pageserver node {} in path that has no string representation: {:?}",
self.conf.id, datadir,
)
})?;
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
if update_config {
args.push(Cow::Borrowed("--update-config"));
}
background_process::start_process(
"pageserver",
&datadir,
&self.env.pageserver_bin(),
args.iter().map(Cow::as_ref),
self.pageserver_env_variables()?,
background_process::InitialPidFile::Expect(&self.pid_file()),
|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let st = rt.block_on(self.check_status());
match st {
Ok(()) => Ok(true),
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
}
},
)
})
.join()
.unwrap()
})
}
fn pageserver_basic_args<'a>(
@@ -314,33 +293,15 @@ impl PageServerNode {
Ok(config.connect_no_tls()?)
}
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> anyhow::Result<RequestBuilder> {
let mut builder = self.http_client.request(method, url);
if self.conf.http_auth_type == AuthType::NeonJWT {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
builder = builder.bearer_auth(token)
}
Ok(builder)
pub async fn check_status(&self) -> mgmt_api::Result<()> {
self.http_client.status().await
}
pub fn check_status(&self) -> Result<()> {
self.http_request(Method::GET, format!("{}/status", self.http_base_url))?
.send()?
.error_from_body()?;
Ok(())
pub async fn tenant_list(&self) -> mgmt_api::Result<Vec<TenantInfo>> {
self.http_client.list_tenants().await
}
pub fn tenant_list(&self) -> Result<Vec<TenantInfo>> {
Ok(self
.http_request(Method::GET, format!("{}/tenant", self.http_base_url))?
.send()?
.error_from_body()?
.json()?)
}
pub fn tenant_create(
pub async fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
@@ -417,23 +378,10 @@ impl PageServerNode {
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
.json(&request)
.send()?
.error_from_body()?
.json::<Option<String>>()
.with_context(|| {
format!("Failed to parse tenant creation response for tenant id: {new_tenant_id:?}")
})?
.context("No tenant id was found in the tenant creation response")
.and_then(|tenant_id_string| {
tenant_id_string.parse().with_context(|| {
format!("Failed to parse response string as tenant id: '{tenant_id_string}'")
})
})
Ok(self.http_client.tenant_create(&request).await?)
}
pub fn tenant_config(
pub async fn tenant_config(
&self,
tenant_id: TenantId,
mut settings: HashMap<&str, &str>,
@@ -511,54 +459,30 @@ impl PageServerNode {
bail!("Unrecognized tenant settings: {settings:?}")
}
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
.json(&models::TenantConfigRequest { tenant_id, config })
.send()?
.error_from_body()?;
self.http_client
.tenant_config(&models::TenantConfigRequest { tenant_id, config })
.await?;
Ok(())
}
pub fn location_config(
pub async fn location_config(
&self,
tenant_id: TenantId,
config: LocationConfig,
flush_ms: Option<Duration>,
) -> anyhow::Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
let path = format!(
"{}/tenant/{}/location_config",
self.http_base_url, tenant_id
);
let path = if let Some(flush_ms) = flush_ms {
format!("{}?flush_ms={}", path, flush_ms.as_millis())
} else {
path
};
self.http_request(Method::PUT, path)?
.json(&req_body)
.send()?
.error_from_body()?;
Ok(())
Ok(self
.http_client
.location_config(tenant_id, config, flush_ms)
.await?)
}
pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
let timeline_infos: Vec<TimelineInfo> = self
.http_request(
Method::GET,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)?
.send()?
.error_from_body()?
.json()?;
Ok(timeline_infos)
pub async fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
Ok(self.http_client.list_timelines(*tenant_id).await?)
}
pub fn timeline_create(
pub async fn timeline_create(
&self,
tenant_id: TenantId,
new_timeline_id: Option<TimelineId>,
@@ -569,29 +493,14 @@ impl PageServerNode {
) -> anyhow::Result<TimelineInfo> {
// If timeline ID was not specified, generate one
let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
self.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)?
.json(&models::TimelineCreateRequest {
let req = models::TimelineCreateRequest {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
pg_version,
existing_initdb_timeline_id,
})
.send()?
.error_from_body()?
.json::<Option<TimelineInfo>>()
.with_context(|| {
format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
})?
.with_context(|| {
format!(
"No timeline id was found in the timeline creation response for tenant {tenant_id}"
)
})
};
Ok(self.http_client.timeline_create(tenant_id, &req).await?)
}
/// Import a basebackup prepared using either:

View File

@@ -19,11 +19,11 @@ use utils::{
};
/// Given an attached pageserver, retrieve the LSN for all timelines
fn get_lsns(
async fn get_lsns(
tenant_id: TenantId,
pageserver: &PageServerNode,
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let timelines = pageserver.timeline_list(&tenant_id)?;
let timelines = pageserver.timeline_list(&tenant_id).await?;
Ok(timelines
.into_iter()
.map(|t| (t.timeline_id, t.last_record_lsn))
@@ -32,13 +32,13 @@ fn get_lsns(
/// Wait for the timeline LSNs on `pageserver` to catch up with or overtake
/// `baseline`.
fn await_lsn(
async fn await_lsn(
tenant_id: TenantId,
pageserver: &PageServerNode,
baseline: HashMap<TimelineId, Lsn>,
) -> anyhow::Result<()> {
loop {
let latest = match get_lsns(tenant_id, pageserver) {
let latest = match get_lsns(tenant_id, pageserver).await {
Ok(l) => l,
Err(e) => {
println!(
@@ -84,7 +84,7 @@ fn await_lsn(
/// - Coordinate attach/secondary/detach on pageservers
/// - call into attachment_service for generations
/// - reconfigure compute endpoints to point to new attached pageserver
pub fn migrate_tenant(
pub async fn migrate_tenant(
env: &LocalEnv,
tenant_id: TenantId,
dest_ps: PageServerNode,
@@ -108,16 +108,18 @@ pub fn migrate_tenant(
}
}
let previous = attachment_service.inspect(tenant_id)?;
let previous = attachment_service.inspect(tenant_id).await?;
let mut baseline_lsns = None;
if let Some((generation, origin_ps_id)) = &previous {
let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?);
if origin_ps_id == &dest_ps.conf.id {
println!("🔁 Already attached to {origin_ps_id}, freshening...");
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let gen = attachment_service
.attach_hook(tenant_id, dest_ps.conf.id)
.await?;
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_id, dest_conf, None).await?;
println!("✅ Migration complete");
return Ok(());
}
@@ -126,20 +128,24 @@ pub fn migrate_tenant(
let stale_conf =
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
origin_ps.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))?;
origin_ps
.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))
.await?;
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?);
}
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
let gen = attachment_service
.attach_hook(tenant_id, dest_ps.conf.id)
.await?;
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_id, dest_conf, None).await?;
if let Some(baseline) = baseline_lsns {
println!("🕑 Waiting for LSN to catch up...");
await_lsn(tenant_id, &dest_ps, baseline)?;
await_lsn(tenant_id, &dest_ps, baseline).await?;
}
let cplane = ComputeControlPlane::load(env.clone())?;
@@ -159,7 +165,7 @@ pub fn migrate_tenant(
}
let other_ps = PageServerNode::from_env(env, other_ps_conf);
let other_ps_tenants = other_ps.tenant_list()?;
let other_ps_tenants = other_ps.tenant_list().await?;
// Check if this tenant is attached
let found = other_ps_tenants
@@ -181,7 +187,9 @@ pub fn migrate_tenant(
"💤 Switching to secondary mode on pageserver {}",
other_ps.conf.id
);
other_ps.location_config(tenant_id, secondary_conf, None)?;
other_ps
.location_config(tenant_id, secondary_conf, None)
.await?;
}
println!(
@@ -189,7 +197,7 @@ pub fn migrate_tenant(
dest_ps.conf.id
);
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
dest_ps.location_config(tenant_id, dest_conf, None)?;
dest_ps.location_config(tenant_id, dest_conf, None).await?;
println!("✅ Migration complete");

View File

@@ -15,6 +15,7 @@ byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
enum-map.workspace = true
rand.workspace = true
strum.workspace = true
strum_macros.workspace = true
hex.workspace = true

1
pageserver/src/client.rs Normal file
View File

@@ -0,0 +1 @@
pub mod mgmt_api;

View File

@@ -0,0 +1,173 @@
use hyper::{header::AUTHORIZATION, http::method::Method};
use pageserver_api::models::*;
use reqwest::IntoUrl;
use utils::{
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
};
#[derive(Debug)]
pub struct Client {
mgmt_api_endpoint: String,
authorization_header: Option<String>,
client: reqwest::Client,
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("receive body: {0}")]
ReceiveBody(reqwest::Error),
#[error("receive error body: {0}")]
ReceiveErrorBody(String),
#[error("pageserver API: {0}")]
ApiError(String),
}
pub type Result<T> = std::result::Result<T, Error>;
#[async_trait::async_trait]
pub trait ResponseErrorMessageExt: Sized {
async fn error_from_body(self) -> Result<Self>;
}
#[async_trait::async_trait]
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(mut self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
let url = self.url().to_owned();
Err(match self.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => Error::ApiError(msg),
Err(_) => {
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), url))
}
})
}
}
impl Client {
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
client: reqwest::Client::new(),
}
}
pub async fn list_tenants(&self) -> Result<Vec<pageserver_api::models::TenantInfo>> {
let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn list_timelines(
&self,
tenant_id: TenantId,
) -> Result<Vec<pageserver_api::models::TimelineInfo>> {
let uri = format!("{}/v1/tenant/{tenant_id}/timeline", self.mgmt_api_endpoint);
self.get(&uri)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_info(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<pageserver_api::models::TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
);
self.get(&uri)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
async fn get<U: IntoUrl>(&self, uri: U) -> Result<reqwest::Response> {
self.request(Method::GET, uri, ()).await
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let req = self.client.request(method, uri);
let req = if let Some(value) = &self.authorization_header {
req.header(AUTHORIZATION, value)
} else {
req
};
let res = req.json(&body).send().await.map_err(Error::ReceiveBody)?;
let response = res.error_from_body().await?;
Ok(response)
}
pub async fn status(&self) -> Result<()> {
let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
self.get(&uri).await?;
Ok(())
}
pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
self.request(Method::POST, &uri, req)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
self.request(Method::PUT, &uri, req).await?;
Ok(())
}
pub async fn location_config(
&self,
tenant_id: TenantId,
config: LocationConfig,
flush_ms: Option<std::time::Duration>,
) -> Result<()> {
let req_body = TenantLocationConfigRequest { tenant_id, config };
let path = format!(
"{}/v1/tenant/{}/location_config",
self.mgmt_api_endpoint, tenant_id
);
let path = if let Some(flush_ms) = flush_ms {
format!("{}?flush_ms={}", path, flush_ms.as_millis())
} else {
path
};
self.request(Method::PUT, &path, &req_body).await?;
Ok(())
}
pub async fn timeline_create(
&self,
tenant_id: TenantId,
req: &TimelineCreateRequest,
) -> Result<TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{}/timeline",
self.mgmt_api_endpoint, tenant_id
);
self.request(Method::POST, &uri, req)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -25,6 +25,7 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod client;
pub mod failpoint_support;
use crate::task_mgr::TaskKind;