mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 07:30:37 +00:00
control_plane: major rework of attachment_service
This commit is contained in:
@@ -1,14 +1,27 @@
|
||||
use crate::{background_process, local_env::LocalEnv};
|
||||
use anyhow::anyhow;
|
||||
use camino::Utf8PathBuf;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{path::PathBuf, process::Child};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
use hyper::{Method, StatusCode};
|
||||
use pageserver_api::{
|
||||
models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{path::PathBuf, process::Child, str::FromStr};
|
||||
use tracing::instrument;
|
||||
use utils::{
|
||||
auth::{Claims, Scope},
|
||||
id::{NodeId, TenantId},
|
||||
};
|
||||
|
||||
pub struct AttachmentService {
|
||||
env: LocalEnv,
|
||||
listen: String,
|
||||
path: PathBuf,
|
||||
jwt_token: Option<String>,
|
||||
public_key_path: Option<Utf8PathBuf>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
@@ -16,7 +29,7 @@ const COMMAND: &str = "attachment_service";
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AttachHookRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub node_id: Option<NodeId>,
|
||||
}
|
||||
|
||||
@@ -27,7 +40,7 @@ pub struct AttachHookResponse {
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InspectRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -35,6 +48,117 @@ pub struct InspectResponse {
|
||||
pub attachment: Option<(u32, NodeId)>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantCreateResponseShard {
|
||||
pub node_id: NodeId,
|
||||
pub generation: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantCreateResponse {
|
||||
pub shards: Vec<TenantCreateResponseShard>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct NodeRegisterRequest {
|
||||
pub node_id: NodeId,
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct NodeConfigureRequest {
|
||||
pub node_id: NodeId,
|
||||
|
||||
pub availability: Option<NodeAvailability>,
|
||||
pub scheduling: Option<NodeSchedulingPolicy>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantLocateResponseShard {
|
||||
pub shard_id: TenantShardId,
|
||||
pub node_id: NodeId,
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantLocateResponse {
|
||||
pub shards: Vec<TenantLocateResponseShard>,
|
||||
pub shard_params: ShardParameters,
|
||||
}
|
||||
|
||||
/// Explicitly migrating a particular shard is a low level operation
|
||||
/// TODO: higher level "Reschedule tenant" operation where the request
|
||||
/// specifies some constraints, e.g. asking it to get off particular node(s)
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateRequest {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy)]
|
||||
pub enum NodeAvailability {
|
||||
// Normal, happy state
|
||||
Active,
|
||||
// Offline: Tenants shouldn't try to attach here, but they may assume that their
|
||||
// secondary locations on this node still exist. Newly added nodes are in this
|
||||
// state until we successfully contact them.
|
||||
Offline,
|
||||
}
|
||||
|
||||
impl FromStr for NodeAvailability {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"active" => Ok(Self::Active),
|
||||
"offline" => Ok(Self::Offline),
|
||||
_ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy)]
|
||||
pub enum NodeSchedulingPolicy {
|
||||
// Normal, happy state
|
||||
Active,
|
||||
|
||||
// A newly added node: gradually move some work here.
|
||||
Filling,
|
||||
|
||||
// Do not schedule new work here, but leave configured locations in place.
|
||||
Pause,
|
||||
|
||||
// Do not schedule work here. Gracefully move work away, as resources allow.
|
||||
Draining,
|
||||
}
|
||||
|
||||
impl FromStr for NodeSchedulingPolicy {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"active" => Ok(Self::Active),
|
||||
"filling" => Ok(Self::Filling),
|
||||
"pause" => Ok(Self::Pause),
|
||||
"draining" => Ok(Self::Draining),
|
||||
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateResponse {}
|
||||
|
||||
impl AttachmentService {
|
||||
pub fn from_env(env: &LocalEnv) -> Self {
|
||||
let path = env.base_data_dir.join("attachments.json");
|
||||
@@ -49,10 +173,34 @@ impl AttachmentService {
|
||||
listen_url.port().unwrap()
|
||||
);
|
||||
|
||||
// Assume all pageservers have symmetric auth configuration: this service
|
||||
// expects to use one JWT token to talk to all of them.
|
||||
let ps_conf = env
|
||||
.pageservers
|
||||
.first()
|
||||
.expect("Config is validated to contain at least one pageserver");
|
||||
let (jwt_token, public_key_path) = match ps_conf.http_auth_type {
|
||||
AuthType::Trust => (None, None),
|
||||
AuthType::NeonJWT => {
|
||||
let jwt_token = env
|
||||
.generate_auth_token(&Claims::new(None, Scope::PageServerApi))
|
||||
.unwrap();
|
||||
|
||||
// If pageserver auth is enabled, this implicitly enables auth for this service,
|
||||
// using the same credentials.
|
||||
let public_key_path =
|
||||
camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
|
||||
.unwrap();
|
||||
(Some(jwt_token), Some(public_key_path))
|
||||
}
|
||||
};
|
||||
|
||||
Self {
|
||||
env: env.clone(),
|
||||
path,
|
||||
listen,
|
||||
jwt_token,
|
||||
public_key_path,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client"),
|
||||
@@ -67,72 +215,202 @@ impl AttachmentService {
|
||||
pub async fn start(&self) -> anyhow::Result<Child> {
|
||||
let path_str = self.path.to_string_lossy();
|
||||
|
||||
background_process::start_process(
|
||||
let mut args = vec!["-l", &self.listen, "-p", &path_str]
|
||||
.into_iter()
|
||||
.map(|s| s.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
if let Some(jwt_token) = &self.jwt_token {
|
||||
args.push(format!("--jwt-token={jwt_token}"));
|
||||
}
|
||||
|
||||
if let Some(public_key_path) = &self.public_key_path {
|
||||
args.push(format!("--public-key={public_key_path}"));
|
||||
}
|
||||
|
||||
let result = background_process::start_process(
|
||||
COMMAND,
|
||||
&self.env.base_data_dir,
|
||||
&self.env.attachment_service_bin(),
|
||||
["-l", &self.listen, "-p", &path_str],
|
||||
args,
|
||||
[],
|
||||
background_process::InitialPidFile::Create(self.pid_file()),
|
||||
// TODO: a real status check
|
||||
|| async move { anyhow::Ok(true) },
|
||||
|| async {
|
||||
match self.status().await {
|
||||
Ok(_) => Ok(true),
|
||||
Err(_) => Ok(false),
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.await;
|
||||
|
||||
for ps_conf in &self.env.pageservers {
|
||||
let (pg_host, pg_port) =
|
||||
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr)
|
||||
.expect("Unable to parse listen_http_addr");
|
||||
self.node_register(NodeRegisterRequest {
|
||||
node_id: ps_conf.id,
|
||||
listen_pg_addr: pg_host.to_string(),
|
||||
listen_pg_port: pg_port.unwrap_or(5432),
|
||||
listen_http_addr: http_host.to_string(),
|
||||
listen_http_port: http_port.unwrap_or(80),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
background_process::stop_process(immediate, COMMAND, &self.pid_file())
|
||||
}
|
||||
|
||||
/// Call into the attach_hook API, for use before handing out attachments to pageservers
|
||||
pub async fn attach_hook(
|
||||
/// Simple HTTP request wrapper for calling into attachment service
|
||||
async fn dispatch<RQ, RS>(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
pageserver_id: NodeId,
|
||||
) -> anyhow::Result<Option<u32>> {
|
||||
use hyper::StatusCode;
|
||||
|
||||
method: hyper::Method,
|
||||
path: String,
|
||||
body: Option<RQ>,
|
||||
) -> anyhow::Result<RS>
|
||||
where
|
||||
RQ: Serialize + Sized,
|
||||
RS: DeserializeOwned + Sized,
|
||||
{
|
||||
let url = self
|
||||
.env
|
||||
.control_plane_api
|
||||
.clone()
|
||||
.unwrap()
|
||||
.join("attach-hook")
|
||||
.join(&path)
|
||||
.unwrap();
|
||||
|
||||
let mut builder = self.client.request(method, url);
|
||||
if let Some(body) = body {
|
||||
builder = builder.json(&body)
|
||||
}
|
||||
if let Some(jwt_token) = &self.jwt_token {
|
||||
eprintln!("dispatch: using token {jwt_token}");
|
||||
builder = builder.header(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
format!("Bearer {jwt_token}"),
|
||||
);
|
||||
} else {
|
||||
eprintln!("dispatch: no JWT token");
|
||||
}
|
||||
|
||||
let response = builder.send().await?;
|
||||
if response.status() != StatusCode::OK {
|
||||
return Err(anyhow!(
|
||||
"Unexpected status {} on {}",
|
||||
response.status(),
|
||||
path
|
||||
));
|
||||
}
|
||||
|
||||
Ok(response.json().await?)
|
||||
}
|
||||
|
||||
/// Call into the attach_hook API, for use before handing out attachments to pageservers
|
||||
#[instrument(skip(self))]
|
||||
pub async fn attach_hook(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
pageserver_id: NodeId,
|
||||
) -> anyhow::Result<Option<u32>> {
|
||||
let request = AttachHookRequest {
|
||||
tenant_id,
|
||||
tenant_shard_id,
|
||||
node_id: Some(pageserver_id),
|
||||
};
|
||||
|
||||
let response = self.client.post(url).json(&request).send().await?;
|
||||
if response.status() != StatusCode::OK {
|
||||
return Err(anyhow!("Unexpected status {}", response.status()));
|
||||
}
|
||||
let response = self
|
||||
.dispatch::<_, AttachHookResponse>(
|
||||
Method::POST,
|
||||
"attach-hook".to_string(),
|
||||
Some(request),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let response = response.json::<AttachHookResponse>().await?;
|
||||
Ok(response.gen)
|
||||
}
|
||||
|
||||
pub async fn inspect(&self, tenant_id: TenantId) -> anyhow::Result<Option<(u32, NodeId)>> {
|
||||
use hyper::StatusCode;
|
||||
#[instrument(skip(self))]
|
||||
pub async fn inspect(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> anyhow::Result<Option<(u32, NodeId)>> {
|
||||
let request = InspectRequest { tenant_shard_id };
|
||||
|
||||
let url = self
|
||||
.env
|
||||
.control_plane_api
|
||||
.clone()
|
||||
.unwrap()
|
||||
.join("inspect")
|
||||
.unwrap();
|
||||
let response = self
|
||||
.dispatch::<_, InspectResponse>(Method::POST, "inspect".to_string(), Some(request))
|
||||
.await?;
|
||||
|
||||
let request = InspectRequest { tenant_id };
|
||||
|
||||
let response = self.client.post(url).json(&request).send().await?;
|
||||
if response.status() != StatusCode::OK {
|
||||
return Err(anyhow!("Unexpected status {}", response.status()));
|
||||
}
|
||||
|
||||
let response = response.json::<InspectResponse>().await?;
|
||||
Ok(response.attachment)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_create(
|
||||
&self,
|
||||
req: TenantCreateRequest,
|
||||
) -> anyhow::Result<TenantCreateResponse> {
|
||||
self.dispatch(Method::POST, "tenant".to_string(), Some(req))
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
|
||||
self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_migrate(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
) -> anyhow::Result<TenantShardMigrateResponse> {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("tenant/{tenant_shard_id}/migrate"),
|
||||
Some(TenantShardMigrateRequest {
|
||||
tenant_shard_id,
|
||||
node_id,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(node_id=%req.node_id))]
|
||||
pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
|
||||
self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req))
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(node_id=%req.node_id))]
|
||||
pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
|
||||
self.dispatch::<_, ()>(
|
||||
Method::PUT,
|
||||
format!("node/{}/config", req.node_id),
|
||||
Some(req),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn status(&self) -> anyhow::Result<()> {
|
||||
self.dispatch::<(), ()>(Method::GET, "status".to_string(), None)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
|
||||
pub async fn tenant_timeline_create(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
req: TimelineCreateRequest,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
self.dispatch(
|
||||
Method::POST,
|
||||
format!("tenant/{tenant_id}/timeline"),
|
||||
Some(req),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,355 +0,0 @@
|
||||
/// The attachment service mimics the aspects of the control plane API
|
||||
/// that are required for a pageserver to operate.
|
||||
///
|
||||
/// This enables running & testing pageservers without a full-blown
|
||||
/// deployment of the Neon cloud platform.
|
||||
///
|
||||
use anyhow::anyhow;
|
||||
use clap::Parser;
|
||||
use hex::FromHex;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use utils::http::endpoint::request_span;
|
||||
use utils::logging::{self, LogFormat};
|
||||
use utils::signals::{ShutdownSignals, Signal};
|
||||
|
||||
use utils::{
|
||||
http::{
|
||||
endpoint::{self},
|
||||
error::ApiError,
|
||||
json::{json_request, json_response},
|
||||
RequestExt, RouterBuilder,
|
||||
},
|
||||
id::{NodeId, TenantId},
|
||||
tcp_listener,
|
||||
};
|
||||
|
||||
use pageserver_api::control_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
|
||||
ValidateResponseTenant,
|
||||
};
|
||||
|
||||
use control_plane::attachment_service::{
|
||||
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
struct Cli {
|
||||
/// Host and port to listen on, like `127.0.0.1:1234`
|
||||
#[arg(short, long)]
|
||||
listen: std::net::SocketAddr,
|
||||
|
||||
/// Path to the .json file to store state (will be created if it doesn't exist)
|
||||
#[arg(short, long)]
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
// The persistent state of each Tenant
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
struct TenantState {
|
||||
// Currently attached pageserver
|
||||
pageserver: Option<NodeId>,
|
||||
|
||||
// Latest generation number: next time we attach, increment this
|
||||
// and use the incremented number when attaching
|
||||
generation: u32,
|
||||
}
|
||||
|
||||
fn to_hex_map<S, V>(input: &HashMap<TenantId, V>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
V: Clone + Serialize,
|
||||
{
|
||||
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone()));
|
||||
|
||||
transformed
|
||||
.collect::<HashMap<String, V>>()
|
||||
.serialize(serializer)
|
||||
}
|
||||
|
||||
fn from_hex_map<'de, D, V>(deserializer: D) -> Result<HashMap<TenantId, V>, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
V: Deserialize<'de>,
|
||||
{
|
||||
let hex_map = HashMap::<String, V>::deserialize(deserializer)?;
|
||||
hex_map
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
TenantId::from_hex(k)
|
||||
.map(|k| (k, v))
|
||||
.map_err(serde::de::Error::custom)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Top level state available to all HTTP handlers
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct PersistentState {
|
||||
#[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
|
||||
tenants: HashMap<TenantId, TenantState>,
|
||||
|
||||
#[serde(skip)]
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl PersistentState {
|
||||
async fn save(&self) -> anyhow::Result<()> {
|
||||
let bytes = serde_json::to_vec(self)?;
|
||||
tokio::fs::write(&self.path, &bytes).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load(path: &Path) -> anyhow::Result<Self> {
|
||||
let bytes = tokio::fs::read(path).await?;
|
||||
let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
|
||||
decoded.path = path.to_owned();
|
||||
Ok(decoded)
|
||||
}
|
||||
|
||||
async fn load_or_new(path: &Path) -> Self {
|
||||
match Self::load(path).await {
|
||||
Ok(s) => {
|
||||
tracing::info!("Loaded state file at {}", path.display());
|
||||
s
|
||||
}
|
||||
Err(e)
|
||||
if e.downcast_ref::<std::io::Error>()
|
||||
.map(|e| e.kind() == std::io::ErrorKind::NotFound)
|
||||
.unwrap_or(false) =>
|
||||
{
|
||||
tracing::info!("Will create state file at {}", path.display());
|
||||
Self {
|
||||
tenants: HashMap::new(),
|
||||
path: path.to_owned(),
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path.display())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// State available to HTTP request handlers
|
||||
#[derive(Clone)]
|
||||
struct State {
|
||||
inner: Arc<tokio::sync::RwLock<PersistentState>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn new(persistent_state: PersistentState) -> State {
|
||||
Self {
|
||||
inner: Arc::new(tokio::sync::RwLock::new(persistent_state)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn get_state(request: &Request<Body>) -> &State {
|
||||
request
|
||||
.data::<Arc<State>>()
|
||||
.expect("unknown state type")
|
||||
.as_ref()
|
||||
}
|
||||
|
||||
/// Pageserver calls into this on startup, to learn which tenants it should attach
|
||||
async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let mut locked = state.write().await;
|
||||
|
||||
let mut response = ReAttachResponse {
|
||||
tenants: Vec::new(),
|
||||
};
|
||||
for (t, state) in &mut locked.tenants {
|
||||
if state.pageserver == Some(reattach_req.node_id) {
|
||||
state.generation += 1;
|
||||
response.tenants.push(ReAttachResponseTenant {
|
||||
// TODO(sharding): make this shard-aware
|
||||
id: TenantShardId::unsharded(*t),
|
||||
gen: state.generation,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
locked.save().await.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Pageserver calls into this before doing deletions, to confirm that it still
|
||||
/// holds the latest generation for the tenants with deletions enqueued
|
||||
async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let validate_req = json_request::<ValidateRequest>(&mut req).await?;
|
||||
|
||||
let locked = get_state(&req).inner.read().await;
|
||||
|
||||
let mut response = ValidateResponse {
|
||||
tenants: Vec::new(),
|
||||
};
|
||||
|
||||
for req_tenant in validate_req.tenants {
|
||||
// TODO(sharding): make this shard-aware
|
||||
if let Some(tenant_state) = locked.tenants.get(&req_tenant.id.tenant_id) {
|
||||
let valid = tenant_state.generation == req_tenant.gen;
|
||||
tracing::info!(
|
||||
"handle_validate: {}(gen {}): valid={valid} (latest {})",
|
||||
req_tenant.id,
|
||||
req_tenant.gen,
|
||||
tenant_state.generation
|
||||
);
|
||||
response.tenants.push(ValidateResponseTenant {
|
||||
id: req_tenant.id,
|
||||
valid,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
|
||||
/// (in the real control plane this is unnecessary, because the same program is managing
|
||||
/// generation numbers and doing attachments).
|
||||
async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let mut locked = state.write().await;
|
||||
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
.entry(attach_req.tenant_id)
|
||||
.or_insert_with(|| TenantState {
|
||||
pageserver: attach_req.node_id,
|
||||
generation: 0,
|
||||
});
|
||||
|
||||
if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
|
||||
tenant_state.generation += 1;
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_id,
|
||||
ps_id = %attaching_pageserver,
|
||||
generation = %tenant_state.generation,
|
||||
"issuing",
|
||||
);
|
||||
} else if let Some(ps_id) = tenant_state.pageserver {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_id,
|
||||
%ps_id,
|
||||
generation = %tenant_state.generation,
|
||||
"dropping",
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
}
|
||||
tenant_state.pageserver = attach_req.node_id;
|
||||
let generation = tenant_state.generation;
|
||||
|
||||
tracing::info!(
|
||||
"handle_attach_hook: tenant {} set generation {}, pageserver {}",
|
||||
attach_req.tenant_id,
|
||||
tenant_state.generation,
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
|
||||
);
|
||||
|
||||
locked.save().await.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
AttachHookResponse {
|
||||
gen: attach_req.node_id.map(|_| generation),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let inspect_req = json_request::<InspectRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let locked = state.write().await;
|
||||
let tenant_state = locked.tenants.get(&inspect_req.tenant_id);
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
InspectResponse {
|
||||
attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let inspect_req = json_request::<InspectRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let locked = state.write().await;
|
||||
let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
InspectResponse {
|
||||
attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
endpoint::make_router()
|
||||
.data(Arc::new(State::new(persistent_state)))
|
||||
.post("/re-attach", |r| request_span(r, handle_re_attach))
|
||||
.post("/validate", |r| request_span(r, handle_validate))
|
||||
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
|
||||
.post("/inspect", |r| request_span(r, handle_inspect))
|
||||
.post("/tenant/:tenant_id", |r| {
|
||||
request_span(r, handle_tenant_create)
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let args = Cli::parse();
|
||||
tracing::info!(
|
||||
"Starting, state at {}, listening on {}",
|
||||
args.path.to_string_lossy(),
|
||||
args.listen
|
||||
);
|
||||
|
||||
let persistent_state = PersistentState::load_or_new(&args.path).await;
|
||||
|
||||
let http_listener = tcp_listener::bind(args.listen)?;
|
||||
let router = make_router(persistent_state)
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?.serve(service);
|
||||
|
||||
tracing::info!("Serving on {0}", args.listen);
|
||||
|
||||
tokio::task::spawn(server);
|
||||
|
||||
ShutdownSignals::handle(|signal| match signal {
|
||||
Signal::Interrupt | Signal::Terminate | Signal::Quit => {
|
||||
tracing::info!("Got {}. Terminating", signal.name());
|
||||
// We're just a test helper: no graceful shutdown.
|
||||
std::process::exit(0);
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -8,14 +8,18 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::attachment_service::AttachmentService;
|
||||
use control_plane::attachment_service::{
|
||||
AttachmentService, NodeAvailability, NodeConfigureRequest, NodeSchedulingPolicy,
|
||||
};
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::tenant_migration::migrate_tenant;
|
||||
use control_plane::{broker, local_env};
|
||||
use pageserver_api::models::TimelineInfo;
|
||||
use pageserver_api::models::{
|
||||
ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
|
||||
use pageserver_api::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
@@ -30,6 +34,7 @@ use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
|
||||
use url::Host;
|
||||
use utils::{
|
||||
auth::{Claims, Scope},
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
@@ -276,10 +281,10 @@ fn print_timeline(
|
||||
/// Connects to the pageserver to query this information.
|
||||
async fn get_timeline_infos(
|
||||
env: &local_env::LocalEnv,
|
||||
tenant_id: &TenantId,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
) -> Result<HashMap<TimelineId, TimelineInfo>> {
|
||||
Ok(get_default_pageserver(env)
|
||||
.timeline_list(tenant_id)
|
||||
.timeline_list(tenant_shard_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|timeline_info| (timeline_info.timeline_id, timeline_info))
|
||||
@@ -297,6 +302,20 @@ fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::R
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to parse --tenant_id option, for commands that accept a shard suffix
|
||||
fn get_tenant_shard_id(
|
||||
sub_match: &ArgMatches,
|
||||
env: &local_env::LocalEnv,
|
||||
) -> anyhow::Result<TenantShardId> {
|
||||
if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() {
|
||||
tenant_id_from_arguments
|
||||
} else if let Some(default_id) = env.default_tenant_id {
|
||||
Ok(TenantShardId::unsharded(default_id))
|
||||
} else {
|
||||
anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
|
||||
sub_match
|
||||
.get_one::<String>("tenant-id")
|
||||
@@ -305,6 +324,14 @@ fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
|
||||
.context("Failed to parse tenant id from the argument string")
|
||||
}
|
||||
|
||||
fn parse_tenant_shard_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantShardId>> {
|
||||
sub_match
|
||||
.get_one::<String>("tenant-id")
|
||||
.map(|id_str| TenantShardId::from_str(id_str))
|
||||
.transpose()
|
||||
.context("Failed to parse tenant shard id from the argument string")
|
||||
}
|
||||
|
||||
fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId>> {
|
||||
sub_match
|
||||
.get_one::<String>("timeline-id")
|
||||
@@ -393,47 +420,66 @@ async fn handle_tenant(
|
||||
Some(("create", create_match)) => {
|
||||
let tenant_conf: HashMap<_, _> = create_match
|
||||
.get_many::<String>("config")
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.map(|vals: clap::parser::ValuesRef<'_, String>| {
|
||||
vals.flat_map(|c| c.split_once(':')).collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let shard_count: u8 = create_match
|
||||
.get_one::<u8>("shard-count")
|
||||
.cloned()
|
||||
.unwrap_or(0);
|
||||
|
||||
let shard_stripe_size: Option<u32> =
|
||||
create_match.get_one::<u32>("shard-stripe-size").cloned();
|
||||
|
||||
let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
|
||||
|
||||
let generation = if env.control_plane_api.is_some() {
|
||||
// We must register the tenant with the attachment service, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
.attach_hook(tenant_id, pageserver.conf.id)
|
||||
.await?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
pageserver
|
||||
.tenant_create(tenant_id, generation, tenant_conf)
|
||||
// We must register the tenant with the attachment service, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
.tenant_create(TenantCreateRequest {
|
||||
// Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
|
||||
// attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
|
||||
// type is used both in attachment service (for creating tenants) and in pageserver (for creating shards)
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: None,
|
||||
shard_parameters: ShardParameters {
|
||||
count: ShardCount(shard_count),
|
||||
stripe_size: shard_stripe_size.map(ShardStripeSize),
|
||||
},
|
||||
config: tenant_conf,
|
||||
})
|
||||
.await?;
|
||||
println!("tenant {tenant_id} successfully created on the pageserver");
|
||||
|
||||
// Create an initial timeline for the new tenant
|
||||
let new_timeline_id = parse_timeline_id(create_match)?;
|
||||
let new_timeline_id =
|
||||
parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
|
||||
let pg_version = create_match
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let timeline_info = pageserver
|
||||
.timeline_create(
|
||||
// FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
|
||||
// different shards picking different start lsns. Maybe we have to teach attachment service
|
||||
// to let shard 0 branch first and then propagate the chosen LSN to other shards.
|
||||
attachment_service
|
||||
.tenant_timeline_create(
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
None,
|
||||
TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
ancestor_timeline_id: None,
|
||||
ancestor_start_lsn: None,
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(pg_version),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
env.register_branch_mapping(
|
||||
DEFAULT_BRANCH_NAME.to_string(),
|
||||
@@ -441,9 +487,7 @@ async fn handle_tenant(
|
||||
new_timeline_id,
|
||||
)?;
|
||||
|
||||
println!(
|
||||
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
|
||||
);
|
||||
println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
|
||||
|
||||
if create_match.get_flag("set-default") {
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
@@ -470,14 +514,64 @@ async fn handle_tenant(
|
||||
println!("tenant {tenant_id} successfully configured on the pageserver");
|
||||
}
|
||||
Some(("migrate", matches)) => {
|
||||
let tenant_id = get_tenant_id(matches, env)?;
|
||||
let tenant_shard_id = get_tenant_shard_id(matches, env)?;
|
||||
let new_pageserver = get_pageserver(env, matches)?;
|
||||
let new_pageserver_id = new_pageserver.conf.id;
|
||||
|
||||
migrate_tenant(env, tenant_id, new_pageserver).await?;
|
||||
println!("tenant {tenant_id} migrated to {}", new_pageserver_id);
|
||||
}
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
.tenant_migrate(tenant_shard_id, new_pageserver_id)
|
||||
.await?;
|
||||
|
||||
println!("tenant {tenant_shard_id} migrated to {}", new_pageserver_id);
|
||||
}
|
||||
Some(("status", matches)) => {
|
||||
let tenant_id = get_tenant_id(matches, env)?;
|
||||
|
||||
let mut shard_table = comfy_table::Table::new();
|
||||
shard_table.set_header(["Shard", "Pageserver", "Physical Size"]);
|
||||
|
||||
let mut tenant_synthetic_size = None;
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
for shard in attachment_service.tenant_locate(tenant_id).await?.shards {
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?);
|
||||
|
||||
let size = pageserver
|
||||
.http_client
|
||||
.tenant_details(shard.shard_id)
|
||||
.await?
|
||||
.tenant_info
|
||||
.current_physical_size
|
||||
.unwrap();
|
||||
|
||||
shard_table.add_row([
|
||||
format!("{}", shard.shard_id.shard_slug()),
|
||||
format!("{}", shard.node_id.0),
|
||||
format!("{} MiB", size / (1024 * 1024)),
|
||||
]);
|
||||
|
||||
if shard.shard_id.is_zero() {
|
||||
tenant_synthetic_size =
|
||||
Some(pageserver.tenant_synthetic_size(shard.shard_id).await?);
|
||||
}
|
||||
}
|
||||
|
||||
let Some(synthetic_size) = tenant_synthetic_size else {
|
||||
bail!("Shard 0 not found")
|
||||
};
|
||||
|
||||
let mut tenant_table = comfy_table::Table::new();
|
||||
tenant_table.add_row(["Tenant ID".to_string(), tenant_id.to_string()]);
|
||||
tenant_table.add_row([
|
||||
"Synthetic size".to_string(),
|
||||
format!("{} MiB", synthetic_size.size.unwrap_or(0) / (1024 * 1024)),
|
||||
]);
|
||||
|
||||
println!("{tenant_table}");
|
||||
println!("{shard_table}");
|
||||
}
|
||||
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
|
||||
None => bail!("no tenant subcommand provided"),
|
||||
}
|
||||
@@ -489,8 +583,10 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
|
||||
match timeline_match.subcommand() {
|
||||
Some(("list", list_match)) => {
|
||||
let tenant_id = get_tenant_id(list_match, env)?;
|
||||
let timelines = pageserver.timeline_list(&tenant_id).await?;
|
||||
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
|
||||
// where shard 0 is attached, and query there.
|
||||
let tenant_shard_id = get_tenant_shard_id(list_match, env)?;
|
||||
let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
|
||||
print_timelines_tree(timelines, env.timeline_name_mappings())?;
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
@@ -505,18 +601,19 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let new_timeline_id_opt = parse_timeline_id(create_match)?;
|
||||
let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
|
||||
|
||||
let timeline_info = pageserver
|
||||
.timeline_create(
|
||||
tenant_id,
|
||||
new_timeline_id_opt,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
None,
|
||||
)
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
ancestor_timeline_id: None,
|
||||
existing_initdb_timeline_id: None,
|
||||
ancestor_start_lsn: None,
|
||||
pg_version: Some(pg_version),
|
||||
};
|
||||
let timeline_info = attachment_service
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
.await?;
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
|
||||
@@ -574,7 +671,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
None,
|
||||
pg_version,
|
||||
ComputeMode::Primary,
|
||||
DEFAULT_PAGESERVER_ID,
|
||||
)?;
|
||||
println!("Done");
|
||||
}
|
||||
@@ -598,17 +694,18 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
.map(|lsn_str| Lsn::from_str(lsn_str))
|
||||
.transpose()
|
||||
.context("Failed to parse ancestor start Lsn from the request")?;
|
||||
let timeline_info = pageserver
|
||||
.timeline_create(
|
||||
tenant_id,
|
||||
None,
|
||||
start_lsn,
|
||||
Some(ancestor_timeline_id),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
let new_timeline_id = TimelineId::generate();
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
ancestor_timeline_id: Some(ancestor_timeline_id),
|
||||
existing_initdb_timeline_id: None,
|
||||
ancestor_start_lsn: start_lsn,
|
||||
pg_version: None,
|
||||
};
|
||||
let timeline_info = attachment_service
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
.await?;
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
@@ -635,8 +732,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
match sub_name {
|
||||
"list" => {
|
||||
let tenant_id = get_tenant_id(sub_args, env)?;
|
||||
let timeline_infos = get_timeline_infos(env, &tenant_id)
|
||||
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
|
||||
// where shard 0 is attached, and query there.
|
||||
let tenant_shard_id = get_tenant_shard_id(sub_args, env)?;
|
||||
let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("Failed to load timeline info: {}", e);
|
||||
@@ -661,7 +760,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
for (endpoint_id, endpoint) in cplane
|
||||
.endpoints
|
||||
.iter()
|
||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
|
||||
{
|
||||
let lsn_str = match endpoint.mode {
|
||||
ComputeMode::Static(lsn) => {
|
||||
@@ -680,7 +779,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
};
|
||||
|
||||
let branch_name = timeline_name_mappings
|
||||
.get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
|
||||
.get(&TenantTimelineId::new(
|
||||
tenant_shard_id.tenant_id,
|
||||
endpoint.timeline_id,
|
||||
))
|
||||
.map(|name| name.as_str())
|
||||
.unwrap_or("?");
|
||||
|
||||
@@ -728,13 +830,6 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
let pageserver_id =
|
||||
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
|
||||
NodeId(id_str.parse().context("while parsing pageserver id")?)
|
||||
} else {
|
||||
DEFAULT_PAGESERVER_ID
|
||||
};
|
||||
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
@@ -762,7 +857,6 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
http_port,
|
||||
pg_version,
|
||||
mode,
|
||||
pageserver_id,
|
||||
)?;
|
||||
}
|
||||
"start" => {
|
||||
@@ -805,6 +899,22 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
endpoint.timeline_id,
|
||||
)?;
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let locate_result = attachment_service.tenant_locate(endpoint.tenant_id).await?;
|
||||
let pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Attachment service reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(!pageservers.is_empty());
|
||||
let stripe_size = locate_result.shard_params.stripe_size.map(|s| s.0 as usize);
|
||||
|
||||
let ps_conf = env.get_pageserver_conf(pageserver_id)?;
|
||||
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
|
||||
let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
|
||||
@@ -816,7 +926,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint
|
||||
.start(&auth_token, safekeepers, remote_ext_config)
|
||||
.start(
|
||||
&auth_token,
|
||||
safekeepers,
|
||||
pageservers,
|
||||
remote_ext_config,
|
||||
stripe_size,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
"reconfigure" => {
|
||||
@@ -827,15 +943,31 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
.endpoints
|
||||
.get(endpoint_id.as_str())
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
let pageserver_id =
|
||||
let pageservers =
|
||||
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
|
||||
Some(NodeId(
|
||||
id_str.parse().context("while parsing pageserver id")?,
|
||||
))
|
||||
let ps_id = NodeId(id_str.parse().context("while parsing pageserver id")?);
|
||||
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
|
||||
vec![(
|
||||
pageserver.pg_connection_config.host().clone(),
|
||||
pageserver.pg_connection_config.port(),
|
||||
)]
|
||||
} else {
|
||||
None
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
.tenant_locate(endpoint.tenant_id)
|
||||
.await?
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Attachment service reported malformed host"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
endpoint.reconfigure(pageserver_id).await?;
|
||||
endpoint.reconfigure(pageservers).await?;
|
||||
}
|
||||
"stop" => {
|
||||
let endpoint_id = sub_args
|
||||
@@ -959,6 +1091,21 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
}
|
||||
}
|
||||
|
||||
Some(("set-state", subcommand_args)) => {
|
||||
let pageserver = get_pageserver(env, subcommand_args)?;
|
||||
let scheduling = subcommand_args.get_one("scheduling");
|
||||
let availability = subcommand_args.get_one("availability");
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
.node_configure(NodeConfigureRequest {
|
||||
node_id: pageserver.conf.id,
|
||||
scheduling: scheduling.cloned(),
|
||||
availability: availability.cloned(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Some(("status", subcommand_args)) => {
|
||||
match get_pageserver(env, subcommand_args)?.check_status().await {
|
||||
Ok(_) => println!("Page server is up and running"),
|
||||
@@ -1352,6 +1499,8 @@ fn cli() -> Command {
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
|
||||
.help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
|
||||
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
|
||||
.arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
|
||||
)
|
||||
.subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
|
||||
.about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
|
||||
@@ -1362,6 +1511,9 @@ fn cli() -> Command {
|
||||
.about("Migrate a tenant from one pageserver to another")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(pageserver_id_arg.clone()))
|
||||
.subcommand(Command::new("status")
|
||||
.about("Human readable summary of the tenant's shards and attachment locations")
|
||||
.arg(tenant_id_arg.clone()))
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("pageserver")
|
||||
@@ -1381,6 +1533,12 @@ fn cli() -> Command {
|
||||
.about("Restart local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("set-state")
|
||||
.arg(Arg::new("availability").value_parser(value_parser!(NodeAvailability)).long("availability").action(ArgAction::Set).help("Availability state: offline,active"))
|
||||
.arg(Arg::new("scheduling").value_parser(value_parser!(NodeSchedulingPolicy)).long("scheduling").action(ArgAction::Set).help("Scheduling state: draining,pause,filling,active"))
|
||||
.about("Set scheduling or availability state of pageserver node")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("attachment_service")
|
||||
|
||||
@@ -49,10 +49,11 @@ use compute_api::spec::RemoteExtSpec;
|
||||
use nix::sys::signal::kill;
|
||||
use nix::sys::signal::Signal;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
use crate::attachment_service::AttachmentService;
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::pageserver::PageServerNode;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
|
||||
use compute_api::responses::{ComputeState, ComputeStatus};
|
||||
@@ -69,7 +70,6 @@ pub struct EndpointConf {
|
||||
http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
pageserver_id: NodeId,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -121,19 +121,14 @@ impl ComputeControlPlane {
|
||||
http_port: Option<u16>,
|
||||
pg_version: u32,
|
||||
mode: ComputeMode,
|
||||
pageserver_id: NodeId,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
|
||||
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
|
||||
let pageserver =
|
||||
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
|
||||
|
||||
let ep = Arc::new(Endpoint {
|
||||
endpoint_id: endpoint_id.to_owned(),
|
||||
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
|
||||
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
|
||||
env: self.env.clone(),
|
||||
pageserver,
|
||||
timeline_id,
|
||||
mode,
|
||||
tenant_id,
|
||||
@@ -159,7 +154,6 @@ impl ComputeControlPlane {
|
||||
pg_port,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: true,
|
||||
pageserver_id,
|
||||
})?,
|
||||
)?;
|
||||
std::fs::write(
|
||||
@@ -218,7 +212,6 @@ pub struct Endpoint {
|
||||
// These are not part of the endpoint as such, but the environment
|
||||
// the endpoint runs in.
|
||||
pub env: LocalEnv,
|
||||
pageserver: PageServerNode,
|
||||
|
||||
// Optimizations
|
||||
skip_pg_catalog_updates: bool,
|
||||
@@ -241,15 +234,11 @@ impl Endpoint {
|
||||
let conf: EndpointConf =
|
||||
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
|
||||
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
|
||||
|
||||
Ok(Endpoint {
|
||||
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
|
||||
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
|
||||
endpoint_id,
|
||||
env: env.clone(),
|
||||
pageserver,
|
||||
timeline_id: conf.timeline_id,
|
||||
mode: conf.mode,
|
||||
tenant_id: conf.tenant_id,
|
||||
@@ -469,11 +458,21 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
|
||||
pageservers
|
||||
.iter()
|
||||
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
remote_ext_config: Option<&String>,
|
||||
shard_stripe_size: Option<usize>,
|
||||
) -> Result<()> {
|
||||
if self.status() == "running" {
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
@@ -487,13 +486,9 @@ impl Endpoint {
|
||||
std::fs::remove_dir_all(self.pgdata())?;
|
||||
}
|
||||
|
||||
let pageserver_connstring = {
|
||||
let config = &self.pageserver.pg_connection_config;
|
||||
let (host, port) = (config.host(), config.port());
|
||||
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstring.is_empty());
|
||||
|
||||
// NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
|
||||
format!("postgresql://no_user@{host}:{port}")
|
||||
};
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in safekeepers {
|
||||
@@ -543,6 +538,7 @@ impl Endpoint {
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
@@ -665,7 +661,7 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
|
||||
pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> {
|
||||
let mut spec: ComputeSpec = {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
let file = std::fs::File::open(spec_path)?;
|
||||
@@ -675,25 +671,27 @@ impl Endpoint {
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
|
||||
if let Some(pageserver_id) = pageserver_id {
|
||||
let endpoint_config_path = self.endpoint_path().join("endpoint.json");
|
||||
let mut endpoint_conf: EndpointConf = {
|
||||
let file = std::fs::File::open(&endpoint_config_path)?;
|
||||
serde_json::from_reader(file)?
|
||||
};
|
||||
endpoint_conf.pageserver_id = pageserver_id;
|
||||
std::fs::write(
|
||||
endpoint_config_path,
|
||||
serde_json::to_string_pretty(&endpoint_conf)?,
|
||||
)?;
|
||||
|
||||
let pageserver =
|
||||
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
|
||||
let ps_http_conf = &pageserver.pg_connection_config;
|
||||
let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
|
||||
spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
|
||||
// If we weren't given explicit pageservers, query the attachment service
|
||||
if pageservers.is_empty() {
|
||||
let attachment_service = AttachmentService::from_env(&self.env);
|
||||
let locate_result = attachment_service.tenant_locate(self.tenant_id).await?;
|
||||
pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Attachment service reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
}
|
||||
|
||||
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstr.is_empty());
|
||||
spec.pageserver_connstring = Some(pageserver_connstr);
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let response = client
|
||||
.post(format!(
|
||||
|
||||
@@ -14,4 +14,3 @@ pub mod local_env;
|
||||
pub mod pageserver;
|
||||
pub mod postgresql_conf;
|
||||
pub mod safekeeper;
|
||||
pub mod tenant_migration;
|
||||
|
||||
@@ -17,7 +17,9 @@ use std::time::Duration;
|
||||
use anyhow::{bail, Context};
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::models::{self, LocationConfig, ShardParameters, TenantInfo, TimelineInfo};
|
||||
use pageserver_api::models::{
|
||||
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use postgres_backend::AuthType;
|
||||
@@ -106,6 +108,16 @@ impl PageServerNode {
|
||||
"control_plane_api='{}'",
|
||||
control_plane_api.as_str()
|
||||
));
|
||||
|
||||
// Attachment service uses the same auth as pageserver: if JWT is enabled
|
||||
// for us, we will also need it to talk to them.
|
||||
if matches!(self.conf.http_auth_type, AuthType::NeonJWT) {
|
||||
let jwt_token = self
|
||||
.env
|
||||
.generate_auth_token(&Claims::new(None, Scope::PageServerApi))
|
||||
.unwrap();
|
||||
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
|
||||
}
|
||||
}
|
||||
|
||||
if !cli_overrides
|
||||
@@ -301,16 +313,8 @@ impl PageServerNode {
|
||||
pub async fn tenant_list(&self) -> mgmt_api::Result<Vec<TenantInfo>> {
|
||||
self.http_client.list_tenants().await
|
||||
}
|
||||
|
||||
pub async fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let mut settings = settings.clone();
|
||||
|
||||
let config = models::TenantConfig {
|
||||
pub fn parse_config(mut settings: HashMap<&str, &str>) -> anyhow::Result<models::TenantConfig> {
|
||||
let result = models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
@@ -371,6 +375,20 @@ impl PageServerNode {
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
} else {
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let config = Self::parse_config(settings.clone())?;
|
||||
|
||||
let request = models::TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
|
||||
@@ -472,18 +490,21 @@ impl PageServerNode {
|
||||
|
||||
pub async fn location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
config: LocationConfig,
|
||||
flush_ms: Option<Duration>,
|
||||
) -> anyhow::Result<()> {
|
||||
Ok(self
|
||||
.http_client
|
||||
.location_config(tenant_id, config, flush_ms)
|
||||
.location_config(tenant_shard_id, config, flush_ms)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
|
||||
Ok(self.http_client.list_timelines(*tenant_id).await?)
|
||||
pub async fn timeline_list(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
) -> anyhow::Result<Vec<TimelineInfo>> {
|
||||
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
|
||||
}
|
||||
|
||||
pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> {
|
||||
@@ -495,15 +516,13 @@ impl PageServerNode {
|
||||
|
||||
pub async fn timeline_create(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_timeline_id: TimelineId,
|
||||
ancestor_start_lsn: Option<Lsn>,
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
pg_version: Option<u32>,
|
||||
existing_initdb_timeline_id: Option<TimelineId>,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
// If timeline ID was not specified, generate one
|
||||
let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
|
||||
let req = models::TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
@@ -511,7 +530,10 @@ impl PageServerNode {
|
||||
pg_version,
|
||||
existing_initdb_timeline_id,
|
||||
};
|
||||
Ok(self.http_client.timeline_create(tenant_id, &req).await?)
|
||||
Ok(self
|
||||
.http_client
|
||||
.timeline_create(tenant_shard_id, &req)
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Import a basebackup prepared using either:
|
||||
@@ -589,4 +611,14 @@ impl PageServerNode {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_synthetic_size(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> anyhow::Result<TenantHistorySize> {
|
||||
Ok(self
|
||||
.http_client
|
||||
.tenant_synthetic_size(tenant_shard_id)
|
||||
.await?)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,220 +0,0 @@
|
||||
//!
|
||||
//! Functionality for migrating tenants across pageservers: unlike most of neon_local, this code
|
||||
//! isn't scoped to a particular physical service, as it needs to update compute endpoints to
|
||||
//! point to the new pageserver.
|
||||
//!
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::{
|
||||
attachment_service::AttachmentService, endpoint::ComputeControlPlane,
|
||||
pageserver::PageServerNode,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
/// Given an attached pageserver, retrieve the LSN for all timelines
|
||||
async fn get_lsns(
|
||||
tenant_id: TenantId,
|
||||
pageserver: &PageServerNode,
|
||||
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
|
||||
let timelines = pageserver.timeline_list(&tenant_id).await?;
|
||||
Ok(timelines
|
||||
.into_iter()
|
||||
.map(|t| (t.timeline_id, t.last_record_lsn))
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Wait for the timeline LSNs on `pageserver` to catch up with or overtake
|
||||
/// `baseline`.
|
||||
async fn await_lsn(
|
||||
tenant_id: TenantId,
|
||||
pageserver: &PageServerNode,
|
||||
baseline: HashMap<TimelineId, Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let latest = match get_lsns(tenant_id, pageserver).await {
|
||||
Ok(l) => l,
|
||||
Err(_e) => {
|
||||
println!(
|
||||
"🕑 Waiting for pageserver {} to activate...",
|
||||
pageserver.conf.id
|
||||
);
|
||||
std::thread::sleep(Duration::from_millis(500));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut any_behind: bool = false;
|
||||
for (timeline_id, baseline_lsn) in &baseline {
|
||||
match latest.get(timeline_id) {
|
||||
Some(latest_lsn) => {
|
||||
println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
|
||||
if latest_lsn < baseline_lsn {
|
||||
any_behind = true;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Expected timeline isn't yet visible on migration destination.
|
||||
// (IRL we would have to account for timeline deletion, but this
|
||||
// is just test helper)
|
||||
any_behind = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !any_behind {
|
||||
println!("✅ LSN caught up. Proceeding...");
|
||||
break;
|
||||
} else {
|
||||
std::thread::sleep(Duration::from_millis(500));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This function spans multiple services, to demonstrate live migration of a tenant
|
||||
/// between pageservers:
|
||||
/// - Coordinate attach/secondary/detach on pageservers
|
||||
/// - call into attachment_service for generations
|
||||
/// - reconfigure compute endpoints to point to new attached pageserver
|
||||
pub async fn migrate_tenant(
|
||||
env: &LocalEnv,
|
||||
tenant_id: TenantId,
|
||||
dest_ps: PageServerNode,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("🤔 Checking existing status...");
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
|
||||
fn build_location_config(
|
||||
mode: LocationConfigMode,
|
||||
generation: Option<u32>,
|
||||
secondary_conf: Option<LocationConfigSecondary>,
|
||||
) -> LocationConfig {
|
||||
LocationConfig {
|
||||
mode,
|
||||
generation,
|
||||
secondary_conf,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
shard_number: 0,
|
||||
shard_count: 0,
|
||||
shard_stripe_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
let previous = attachment_service.inspect(tenant_id).await?;
|
||||
let mut baseline_lsns = None;
|
||||
if let Some((generation, origin_ps_id)) = &previous {
|
||||
let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?);
|
||||
|
||||
if origin_ps_id == &dest_ps.conf.id {
|
||||
println!("🔁 Already attached to {origin_ps_id}, freshening...");
|
||||
let gen = attachment_service
|
||||
.attach_hook(tenant_id, dest_ps.conf.id)
|
||||
.await?;
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None).await?;
|
||||
println!("✅ Migration complete");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode");
|
||||
|
||||
let stale_conf =
|
||||
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
|
||||
origin_ps
|
||||
.location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))
|
||||
.await?;
|
||||
|
||||
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?);
|
||||
}
|
||||
|
||||
println!(
|
||||
"🔁 Downloading latest layers to destination pageserver {}",
|
||||
dest_ps.conf.id
|
||||
);
|
||||
match dest_ps
|
||||
.tenant_secondary_download(&TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
println!(" (skipping, destination wasn't in secondary mode)")
|
||||
}
|
||||
}
|
||||
|
||||
let gen = attachment_service
|
||||
.attach_hook(tenant_id, dest_ps.conf.id)
|
||||
.await?;
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
|
||||
|
||||
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None).await?;
|
||||
|
||||
if let Some(baseline) = baseline_lsns {
|
||||
println!("🕑 Waiting for LSN to catch up...");
|
||||
await_lsn(tenant_id, &dest_ps, baseline).await?;
|
||||
}
|
||||
|
||||
let cplane = ComputeControlPlane::load(env.clone())?;
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == tenant_id {
|
||||
println!(
|
||||
"🔁 Reconfiguring endpoint {} to use pageserver {}",
|
||||
endpoint_name, dest_ps.conf.id
|
||||
);
|
||||
endpoint.reconfigure(Some(dest_ps.conf.id)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
for other_ps_conf in &env.pageservers {
|
||||
if other_ps_conf.id == dest_ps.conf.id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let other_ps = PageServerNode::from_env(env, other_ps_conf);
|
||||
let other_ps_tenants = other_ps.tenant_list().await?;
|
||||
|
||||
// Check if this tenant is attached
|
||||
let found = other_ps_tenants
|
||||
.into_iter()
|
||||
.map(|t| t.id)
|
||||
.any(|i| i.tenant_id == tenant_id);
|
||||
if !found {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Downgrade to a secondary location
|
||||
let secondary_conf = build_location_config(
|
||||
LocationConfigMode::Secondary,
|
||||
None,
|
||||
Some(LocationConfigSecondary { warm: true }),
|
||||
);
|
||||
|
||||
println!(
|
||||
"💤 Switching to secondary mode on pageserver {}",
|
||||
other_ps.conf.id
|
||||
);
|
||||
other_ps
|
||||
.location_config(tenant_id, secondary_conf, None)
|
||||
.await?;
|
||||
}
|
||||
|
||||
println!(
|
||||
"🔁 Switching to AttachedSingle mode on pageserver {}",
|
||||
dest_ps.conf.id
|
||||
);
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf, None).await?;
|
||||
|
||||
println!("✅ Migration complete");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user