diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 331bb00e2c..7361c5b447 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -119,10 +119,12 @@ impl AttachmentService { &self.env.attachment_service_bin(), ["-l", &self.listen, "-p", &path_str], [], - background_process::InitialPidFile::Create(&self.pid_file()), - || match self.status() { - Ok(_) => Ok(true), - Err(_) => Ok(false), + background_process::InitialPidFile::Create(self.pid_file()), + || async { + match self.status().await { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } }, ) .await; @@ -138,7 +140,8 @@ impl AttachmentService { listen_pg_port: pg_port.unwrap_or(5432), listen_http_addr: http_host.to_string(), listen_http_port: http_port.unwrap_or(80), - })?; + }) + .await?; } result @@ -149,7 +152,7 @@ impl AttachmentService { } /// Simple HTTP request wrapper for calling into attachment service - fn dispatch( + async fn dispatch( &self, method: hyper::Method, path: String, @@ -172,7 +175,7 @@ impl AttachmentService { builder = builder.json(&body) } - let response = builder.send()?; + let response = builder.send().await?; if response.status() != StatusCode::OK { return Err(anyhow!( "Unexpected status {} on {}", @@ -181,7 +184,7 @@ impl AttachmentService { )); } - Ok(response.json()?) + Ok(response.json().await?) } /// Call into the attach_hook API, for use before handing out attachments to pageservers @@ -238,36 +241,48 @@ impl AttachmentService { } #[instrument(skip(self))] - pub fn tenant_create(&self, req: TenantCreateRequest) -> anyhow::Result { + pub async fn tenant_create( + &self, + req: TenantCreateRequest, + ) -> anyhow::Result { self.dispatch(Method::POST, "tenant".to_string(), Some(req)) + .await } #[instrument(skip(self))] - pub fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result { + pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result { self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None) + .await } #[instrument(skip(self), fields(%tenant_id, %new_shard_count))] - pub fn tenant_split(&self, tenant_id: TenantId, new_shard_count: u8) -> anyhow::Result<()> { + pub async fn tenant_split( + &self, + tenant_id: TenantId, + new_shard_count: u8, + ) -> anyhow::Result<()> { self.dispatch::<_, ()>( Method::POST, format!("tenant/{tenant_id}/shard_split"), Some(TenantShardSplitRequest { new_shard_count }), ) + .await } #[instrument(skip_all, fields(node_id=%req.node_id))] - pub fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { + pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req)) + .await } #[instrument(skip(self))] - pub fn status(&self) -> anyhow::Result<()> { + pub async fn status(&self) -> anyhow::Result<()> { self.dispatch::<(), ()>(Method::GET, "status".to_string(), None) + .await } #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))] - pub fn tenant_timeline_create( + pub async fn tenant_timeline_create( &self, tenant_id: TenantId, req: TimelineCreateRequest, @@ -277,5 +292,6 @@ impl AttachmentService { format!("tenant/{tenant_id}/timeline"), Some(req), ) + .await } } diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f4d7956008..b0d7997f06 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -440,18 +440,20 @@ async fn handle_tenant( // We must register the tenant with the attachment service, so // that when the pageserver restarts, it will be re-attached. let attachment_service = AttachmentService::from_env(env); - attachment_service.tenant_create(TenantCreateRequest { - // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the - // attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest - // type is used both in attachment service (for creating tenants) and in pageserver (for creating shards) - new_tenant_id: TenantShardId::unsharded(tenant_id), - generation: None, - shard_parameters: ShardParameters { - count: ShardCount(shard_count), - stripe_size: shard_stripe_size.map(ShardStripeSize), - }, - config: tenant_conf, - })?; + attachment_service + .tenant_create(TenantCreateRequest { + // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the + // attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest + // type is used both in attachment service (for creating tenants) and in pageserver (for creating shards) + new_tenant_id: TenantShardId::unsharded(tenant_id), + generation: None, + shard_parameters: ShardParameters { + count: ShardCount(shard_count), + stripe_size: shard_stripe_size.map(ShardStripeSize), + }, + config: tenant_conf, + }) + .await?; println!("tenant {tenant_id} successfully created on the pageserver"); // Create an initial timeline for the new tenant @@ -465,16 +467,18 @@ async fn handle_tenant( // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have // different shards picking different start lsns. Maybe we have to teach attachment service // to let shard 0 branch first and then propagate the chosen LSN to other shards. - attachment_service.tenant_timeline_create( - tenant_id, - TimelineCreateRequest { - new_timeline_id, - ancestor_timeline_id: None, - ancestor_start_lsn: None, - existing_initdb_timeline_id: None, - pg_version: Some(pg_version), - }, - )?; + attachment_service + .tenant_timeline_create( + tenant_id, + TimelineCreateRequest { + new_timeline_id, + ancestor_timeline_id: None, + ancestor_start_lsn: None, + existing_initdb_timeline_id: None, + pg_version: Some(pg_version), + }, + ) + .await?; env.register_branch_mapping( DEFAULT_BRANCH_NAME.to_string(), @@ -519,13 +523,15 @@ async fn handle_tenant( Some(("split", matches)) => { let tenant_id = get_tenant_id(matches, env)?; let attachment_service = AttachmentService::from_env(env); - let old_shards = attachment_service.tenant_locate(tenant_id)?.shards; + let old_shards = attachment_service.tenant_locate(tenant_id).await?.shards; let new_shard_count = old_shards.len() * 2; if old_shards.len() > 127 { bail!("Cannot split further"); } - attachment_service.tenant_split(tenant_id, new_shard_count as u8)?; + attachment_service + .tenant_split(tenant_id, new_shard_count as u8) + .await?; println!("Split {}->{}", old_shards.len(), new_shard_count); } Some(("status", matches)) => { @@ -537,7 +543,7 @@ async fn handle_tenant( let mut tenant_synthetic_size = None; let attachment_service = AttachmentService::from_env(env); - for shard in attachment_service.tenant_locate(tenant_id)?.shards { + for shard in attachment_service.tenant_locate(tenant_id).await?.shards { let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?); @@ -555,7 +561,8 @@ async fn handle_tenant( ]); if shard.shard_id.is_zero() { - tenant_synthetic_size = Some(pageserver.tenant_synthetic_size(shard.shard_id)?); + tenant_synthetic_size = + Some(pageserver.tenant_synthetic_size(shard.shard_id).await?); } } @@ -903,7 +910,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re let attachment_service = AttachmentService::from_env(env); let pageservers = attachment_service - .tenant_locate(endpoint.tenant_id)? + .tenant_locate(endpoint.tenant_id) + .await? .shards .into_iter() .map(|shard| { @@ -948,7 +956,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re } else { let attachment_service = AttachmentService::from_env(env); attachment_service - .tenant_locate(endpoint.tenant_id)? + .tenant_locate(endpoint.tenant_id) + .await? .shards .into_iter() .map(|shard| { diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index d43d39bde9..e076b27a48 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -491,8 +491,11 @@ impl PageServerNode { .await?) } - pub async fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result> { - Ok(self.http_client.list_timelines(*tenant_id).await?) + pub async fn timeline_list( + &self, + tenant_shard_id: &TenantShardId, + ) -> anyhow::Result> { + Ok(self.http_client.list_timelines(*tenant_shard_id).await?) } pub async fn timeline_create( @@ -589,4 +592,14 @@ impl PageServerNode { Ok(()) } + + pub async fn tenant_synthetic_size( + &self, + tenant_shard_id: TenantShardId, + ) -> anyhow::Result { + Ok(self + .http_client + .tenant_synthetic_size(tenant_shard_id) + .await?) + } }