control_plane: rebase fixes

This commit is contained in:
John Spray
2023-12-18 14:54:59 +00:00
parent 495c3d70f3
commit 3835a51429
3 changed files with 82 additions and 44 deletions

View File

@@ -119,10 +119,12 @@ impl AttachmentService {
&self.env.attachment_service_bin(),
["-l", &self.listen, "-p", &path_str],
[],
background_process::InitialPidFile::Create(&self.pid_file()),
|| match self.status() {
Ok(_) => Ok(true),
Err(_) => Ok(false),
background_process::InitialPidFile::Create(self.pid_file()),
|| async {
match self.status().await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
},
)
.await;
@@ -138,7 +140,8 @@ impl AttachmentService {
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})?;
})
.await?;
}
result
@@ -149,7 +152,7 @@ impl AttachmentService {
}
/// Simple HTTP request wrapper for calling into attachment service
fn dispatch<RQ, RS>(
async fn dispatch<RQ, RS>(
&self,
method: hyper::Method,
path: String,
@@ -172,7 +175,7 @@ impl AttachmentService {
builder = builder.json(&body)
}
let response = builder.send()?;
let response = builder.send().await?;
if response.status() != StatusCode::OK {
return Err(anyhow!(
"Unexpected status {} on {}",
@@ -181,7 +184,7 @@ impl AttachmentService {
));
}
Ok(response.json()?)
Ok(response.json().await?)
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
@@ -238,36 +241,48 @@ impl AttachmentService {
}
#[instrument(skip(self))]
pub fn tenant_create(&self, req: TenantCreateRequest) -> anyhow::Result<TenantCreateResponse> {
pub async fn tenant_create(
&self,
req: TenantCreateRequest,
) -> anyhow::Result<TenantCreateResponse> {
self.dispatch(Method::POST, "tenant".to_string(), Some(req))
.await
}
#[instrument(skip(self))]
pub fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None)
.await
}
#[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
pub fn tenant_split(&self, tenant_id: TenantId, new_shard_count: u8) -> anyhow::Result<()> {
pub async fn tenant_split(
&self,
tenant_id: TenantId,
new_shard_count: u8,
) -> anyhow::Result<()> {
self.dispatch::<_, ()>(
Method::POST,
format!("tenant/{tenant_id}/shard_split"),
Some(TenantShardSplitRequest { new_shard_count }),
)
.await
}
#[instrument(skip_all, fields(node_id=%req.node_id))]
pub fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req))
.await
}
#[instrument(skip(self))]
pub fn status(&self) -> anyhow::Result<()> {
pub async fn status(&self) -> anyhow::Result<()> {
self.dispatch::<(), ()>(Method::GET, "status".to_string(), None)
.await
}
#[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))]
pub fn tenant_timeline_create(
pub async fn tenant_timeline_create(
&self,
tenant_id: TenantId,
req: TimelineCreateRequest,
@@ -277,5 +292,6 @@ impl AttachmentService {
format!("tenant/{tenant_id}/timeline"),
Some(req),
)
.await
}
}

View File

@@ -440,18 +440,20 @@ async fn handle_tenant(
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.tenant_create(TenantCreateRequest {
// Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
// attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
// type is used both in attachment service (for creating tenants) and in pageserver (for creating shards)
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation: None,
shard_parameters: ShardParameters {
count: ShardCount(shard_count),
stripe_size: shard_stripe_size.map(ShardStripeSize),
},
config: tenant_conf,
})?;
attachment_service
.tenant_create(TenantCreateRequest {
// Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
// attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
// type is used both in attachment service (for creating tenants) and in pageserver (for creating shards)
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation: None,
shard_parameters: ShardParameters {
count: ShardCount(shard_count),
stripe_size: shard_stripe_size.map(ShardStripeSize),
},
config: tenant_conf,
})
.await?;
println!("tenant {tenant_id} successfully created on the pageserver");
// Create an initial timeline for the new tenant
@@ -465,16 +467,18 @@ async fn handle_tenant(
// FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
// different shards picking different start lsns. Maybe we have to teach attachment service
// to let shard 0 branch first and then propagate the chosen LSN to other shards.
attachment_service.tenant_timeline_create(
tenant_id,
TimelineCreateRequest {
new_timeline_id,
ancestor_timeline_id: None,
ancestor_start_lsn: None,
existing_initdb_timeline_id: None,
pg_version: Some(pg_version),
},
)?;
attachment_service
.tenant_timeline_create(
tenant_id,
TimelineCreateRequest {
new_timeline_id,
ancestor_timeline_id: None,
ancestor_start_lsn: None,
existing_initdb_timeline_id: None,
pg_version: Some(pg_version),
},
)
.await?;
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
@@ -519,13 +523,15 @@ async fn handle_tenant(
Some(("split", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
let attachment_service = AttachmentService::from_env(env);
let old_shards = attachment_service.tenant_locate(tenant_id)?.shards;
let old_shards = attachment_service.tenant_locate(tenant_id).await?.shards;
let new_shard_count = old_shards.len() * 2;
if old_shards.len() > 127 {
bail!("Cannot split further");
}
attachment_service.tenant_split(tenant_id, new_shard_count as u8)?;
attachment_service
.tenant_split(tenant_id, new_shard_count as u8)
.await?;
println!("Split {}->{}", old_shards.len(), new_shard_count);
}
Some(("status", matches)) => {
@@ -537,7 +543,7 @@ async fn handle_tenant(
let mut tenant_synthetic_size = None;
let attachment_service = AttachmentService::from_env(env);
for shard in attachment_service.tenant_locate(tenant_id)?.shards {
for shard in attachment_service.tenant_locate(tenant_id).await?.shards {
let pageserver =
PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?);
@@ -555,7 +561,8 @@ async fn handle_tenant(
]);
if shard.shard_id.is_zero() {
tenant_synthetic_size = Some(pageserver.tenant_synthetic_size(shard.shard_id)?);
tenant_synthetic_size =
Some(pageserver.tenant_synthetic_size(shard.shard_id).await?);
}
}
@@ -903,7 +910,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
let attachment_service = AttachmentService::from_env(env);
let pageservers = attachment_service
.tenant_locate(endpoint.tenant_id)?
.tenant_locate(endpoint.tenant_id)
.await?
.shards
.into_iter()
.map(|shard| {
@@ -948,7 +956,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
} else {
let attachment_service = AttachmentService::from_env(env);
attachment_service
.tenant_locate(endpoint.tenant_id)?
.tenant_locate(endpoint.tenant_id)
.await?
.shards
.into_iter()
.map(|shard| {

View File

@@ -491,8 +491,11 @@ impl PageServerNode {
.await?)
}
pub async fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
Ok(self.http_client.list_timelines(*tenant_id).await?)
pub async fn timeline_list(
&self,
tenant_shard_id: &TenantShardId,
) -> anyhow::Result<Vec<TimelineInfo>> {
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
}
pub async fn timeline_create(
@@ -589,4 +592,14 @@ impl PageServerNode {
Ok(())
}
pub async fn tenant_synthetic_size(
&self,
tenant_shard_id: TenantShardId,
) -> anyhow::Result<TenantHistorySize> {
Ok(self
.http_client
.tenant_synthetic_size(tenant_shard_id)
.await?)
}
}