neon_local: attachment service status, refactors

This commit is contained in:
John Spray
2023-12-08 10:09:19 +00:00
parent 58f64339f3
commit 347bd012b3
3 changed files with 106 additions and 55 deletions

View File

@@ -1,13 +1,15 @@
use crate::{background_process, local_env::LocalEnv};
use anyhow::anyhow;
use camino::Utf8PathBuf;
use hyper::StatusCode;
use hyper::{Method, StatusCode};
use pageserver_api::{
models::{TenantCreateRequest, TimelineCreateRequest},
shard::TenantShardId,
};
use serde::{Deserialize, Serialize};
use postgres_connection::parse_host_port;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{path::PathBuf, process::Child};
use tracing::instrument;
use utils::id::{NodeId, TenantId};
pub struct AttachmentService {
@@ -110,24 +112,79 @@ impl AttachmentService {
pub async fn start(&self) -> anyhow::Result<Child> {
let path_str = self.path.to_string_lossy();
background_process::start_process(
let result = background_process::start_process(
COMMAND,
&self.env.base_data_dir,
&self.env.attachment_service_bin(),
["-l", &self.listen, "-p", &path_str],
[],
background_process::InitialPidFile::Create(self.pid_file()),
// TODO: a real status check
|| async move { anyhow::Ok(true) },
background_process::InitialPidFile::Create(&self.pid_file()),
|| match self.status() {
Ok(_) => Ok(true),
Err(_) => Ok(false),
},
)
.await
.await;
for ps_conf in &self.env.pageservers {
let (pg_host, pg_port) =
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
self.node_register(NodeRegisterRequest {
node_id: ps_conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})?;
}
result
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())
}
/// Simple HTTP request wrapper for calling into attachment service
fn dispatch<RQ, RS>(
&self,
method: hyper::Method,
path: String,
body: Option<RQ>,
) -> anyhow::Result<RS>
where
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
let url = self
.env
.control_plane_api
.clone()
.unwrap()
.join(&path)
.unwrap();
let mut builder = self.client.request(method, url);
if let Some(body) = body {
builder = builder.json(&body)
}
let response = builder.send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!(
"Unexpected status {} on {}",
response.status(),
path
));
}
Ok(response.json()?)
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
#[instrument(skip(self))]
pub async fn attach_hook(
&self,
tenant_shard_id: TenantShardId,
@@ -155,12 +212,11 @@ impl AttachmentService {
Ok(response.gen)
}
#[instrument(skip(self))]
pub async fn inspect(
&self,
tenant_shard_id: TenantShardId,
) -> anyhow::Result<Option<(u32, NodeId)>> {
use hyper::StatusCode;
let url = self
.env
.control_plane_api
@@ -180,58 +236,36 @@ impl AttachmentService {
Ok(response.attachment)
}
pub fn tenant_create(&self, req: TenantCreateRequest) -> anyhow::Result<()> {
let url = self
.env
.control_plane_api
.clone()
.unwrap()
.join("tenant")
.unwrap();
let response = self.client.post(url).json(&req).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
Ok(())
#[instrument(skip(self))]
pub fn tenant_create(&self, req: TenantCreateRequest) -> anyhow::Result<TenantCreateResponse> {
self.dispatch(Method::POST, "tenant".to_string(), Some(req))
}
#[instrument(skip(self))]
pub fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
let url = self
.env
.control_plane_api
.clone()
.unwrap()
.join(&format!("tenant/{tenant_id}"))
.unwrap();
let response = self.client.get(url).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
Ok(response.json()?)
self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None)
}
#[instrument(skip_all, fields(node_id=%req.node_id))]
pub fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req))
}
#[instrument(skip(self))]
pub fn status(&self) -> anyhow::Result<()> {
self.dispatch::<(), ()>(Method::GET, "status".to_string(), None)
}
#[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
pub fn tenant_timeline_create(
&self,
tenant_id: TenantId,
req: TimelineCreateRequest,
) -> anyhow::Result<()> {
let url = self
.env
.control_plane_api
.clone()
.unwrap()
.join(&format!("tenant/{tenant_id}/timeline"))
.unwrap();
let response = self.client.post(url).json(&req).send()?;
if response.status() != StatusCode::OK {
return Err(anyhow!("Unexpected status {}", response.status()));
}
Ok(())
self.dispatch(
Method::POST,
format!("tenant/{tenant_id}/timeline"),
Some(req),
)
}
}

View File

@@ -715,9 +715,15 @@ async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, ())
}
/// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, ())
}
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router()
.data(Arc::new(State::new(persistent_state)))
.get("/status", |r| request_span(r, handle_status))
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
.post("/attach-hook", |r| request_span(r, handle_attach_hook))

View File

@@ -18,7 +18,7 @@ use control_plane::{broker, local_env};
use pageserver_api::models::{
ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::shard::{ShardCount, TenantShardId};
use pageserver_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -418,9 +418,16 @@ async fn handle_tenant(
Some(("create", create_match)) => {
let tenant_conf: HashMap<_, _> = create_match
.get_many::<String>("config")
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
.map(|vals: clap::parser::ValuesRef<'_, String>| {
vals.flat_map(|c| c.split_once(':')).collect()
})
.unwrap_or_default();
let shard_count: u8 = create_match
.get_one::<u8>("shard-count")
.cloned()
.unwrap_or(1);
let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
// If tenant ID was not specified, generate one
@@ -435,7 +442,10 @@ async fn handle_tenant(
// type is used both in attachment service (for creating tenants) and in pageserver (for creating shards)
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation: None,
shard_parameters: ShardParameters::default(),
shard_parameters: ShardParameters {
count: ShardCount(shard_count),
stripe_size: None,
},
config: tenant_conf,
})?;
println!("tenant {tenant_id} successfully created on the pageserver");
@@ -1392,6 +1402,7 @@ fn cli() -> Command {
.arg(pg_version_arg.clone())
.arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
.help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
)
.subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
.about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))