From 9cd72caabf316727f651e053fb1a8ef007c35cb6 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 8 Dec 2023 11:44:56 +0000 Subject: [PATCH] neon_local: add tenant status command --- control_plane/src/attachment_service.rs | 1 + control_plane/src/bin/attachment_service.rs | 11 ++++- control_plane/src/bin/neon_local.rs | 49 +++++++++++++++++++++ control_plane/src/pageserver.rs | 20 +++------ libs/pageserver_api/src/models.rs | 16 +++++++ pageserver/client/src/mgmt_api.rs | 21 +++++++-- pageserver/client/src/mgmt_api/util.rs | 6 ++- 7 files changed, 105 insertions(+), 19 deletions(-) diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 65eed2980e..56e3fac565 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -66,6 +66,7 @@ pub struct NodeRegisterRequest { #[derive(Serialize, Deserialize)] pub struct TenantLocateResponseShard { + pub shard_id: TenantShardId, pub node_id: NodeId, pub listen_pg_addr: String, diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs index dadcf2f606..27fde0aab2 100644 --- a/control_plane/src/bin/attachment_service.rs +++ b/control_plane/src/bin/attachment_service.rs @@ -494,7 +494,13 @@ impl Scheduler { let mut tenant_counts: Vec<(NodeId, usize)> = self.tenant_counts.iter().map(|(k, v)| (*k, *v)).collect(); tenant_counts.sort_by_key(|i| i.1); - let node_id = tenant_counts.last().unwrap().0; + + for (node_id, count) in &tenant_counts { + tracing::info!("tenant_counts[{node_id}]={count}"); + } + + let node_id = tenant_counts.first().unwrap().0; + tracing::info!("scheduler selected node {node_id}"); *self.tenant_counts.get_mut(&node_id).unwrap() += 1; Ok(node_id) } @@ -664,7 +670,7 @@ async fn handle_tenant_locate(req: Request) -> Result, ApiE let mut result = Vec::new(); - for (_tenant_shard_id, shard) in locked + for (tenant_shard_id, shard) in locked .tenants .range_mut(TenantShardId::tenant_range(tenant_id)) { @@ -679,6 +685,7 @@ async fn handle_tenant_locate(req: Request) -> Result, ApiE .expect("Pageservers may not be deleted while referenced"); result.push(TenantLocateResponseShard { + shard_id: *tenant_shard_id, node_id, listen_http_addr: node.listen_http_addr.clone(), listen_http_port: node.listen_http_port, diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index ab4abeca2d..b2267f569d 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -515,6 +515,51 @@ async fn handle_tenant( migrate_tenant(env, tenant_shard_id, new_pageserver).await?; println!("tenant {tenant_shard_id} migrated to {}", new_pageserver_id); } + Some(("status", matches)) => { + let tenant_id = get_tenant_id(matches, env)?; + + let mut shard_table = comfy_table::Table::new(); + shard_table.set_header(["Shard", "Pageserver", "Physical Size"]); + + let mut tenant_synthetic_size = None; + + let attachment_service = AttachmentService::from_env(env); + for shard in attachment_service.tenant_locate(tenant_id)?.shards { + let pageserver = + PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?); + + let size = pageserver + .http_client + .tenant_details(shard.shard_id) + .await? + .tenant_info + .current_physical_size + .unwrap(); + shard_table.add_row([ + format!("{}", shard.shard_id.shard_number.0), + format!("{}", shard.node_id.0), + format!("{}", size), + ]); + + if shard.shard_id.is_zero() { + tenant_synthetic_size = Some(pageserver.tenant_synthetic_size(shard.shard_id)?); + } + } + + let Some(synthetic_size) = tenant_synthetic_size else { + bail!("Shard 0 not found") + }; + + let mut tenant_table = comfy_table::Table::new(); + tenant_table.add_row(["Tenant ID".to_string(), tenant_id.to_string()]); + tenant_table.add_row([ + "Synthetic size".to_string(), + format!("{}", synthetic_size.size.unwrap_or(0)), + ]); + + println!("{tenant_table}"); + println!("{shard_table}"); + } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), @@ -1417,6 +1462,10 @@ fn cli() -> Command { .about("Migrate a tenant from one pageserver to another") .arg(tenant_id_arg.clone()) .arg(pageserver_id_arg.clone())) + .subcommand(Command::new("status") + .about("Human readable summary of the tenant's shards and attachment locations") + .arg(tenant_id_arg.clone()) + ) ) .subcommand( Command::new("pageserver") diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 1956bcb196..4e5c8643a5 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -18,7 +18,9 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; use futures::SinkExt; -use pageserver_api::models::{self, LocationConfig, ShardParameters, TenantInfo, TimelineInfo}; +use pageserver_api::models::{ + self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo, +}; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; @@ -302,16 +304,8 @@ impl PageServerNode { pub async fn tenant_list(&self) -> mgmt_api::Result> { self.http_client.list_tenants().await } - - pub async fn tenant_create( - &self, - new_tenant_id: TenantId, - generation: Option, - settings: HashMap<&str, &str>, - ) -> anyhow::Result { - let mut settings = settings.clone(); - - let config = models::TenantConfig { + pub fn parse_config(mut settings: HashMap<&str, &str>) -> anyhow::Result { + let result = models::TenantConfig { checkpoint_distance: settings .remove("checkpoint_distance") .map(|x| x.parse::()) @@ -379,13 +373,13 @@ impl PageServerNode { } } - pub fn tenant_create( + pub async fn tenant_create( &self, new_tenant_id: TenantId, generation: Option, settings: HashMap<&str, &str>, ) -> anyhow::Result { - let config = Self::parse_config(settings)?; + let config = Self::parse_config(settings.clone())?; let request = models::TenantCreateRequest { new_tenant_id: TenantShardId::unsharded(new_tenant_id), diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index c5f8e403a0..68019c0a1e 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -705,6 +705,22 @@ pub struct PagestreamDbSizeResponse { pub db_size: i64, } +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantPhysicalSizeResponse { + pub size: u64, +} + +// XXX hack: this is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields +// that require pageserver-internal types. It is sufficient to get the total size. +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantHistorySize { + pub id: TenantId, + /// Size is a mixture of WAL and logical size, so the unit is bytes. + /// + /// Will be none if `?inputs_only=true` was given. + pub size: Option, +} + impl PagestreamFeMessage { pub fn serialize(&self) -> Bytes { let mut bytes = BytesMut::new(); diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index d3daf68ea5..ee68c04e7b 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,4 +1,4 @@ -use pageserver_api::models::*; +use pageserver_api::{models::*, shard::TenantShardId}; use reqwest::{IntoUrl, Method}; use utils::{ http::error::HttpErrorBody, @@ -68,9 +68,9 @@ impl Client { pub async fn tenant_details( &self, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, ) -> Result { - let uri = format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint); + let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint); self.get(uri) .await? .json() @@ -205,4 +205,19 @@ impl Client { .await .map_err(Error::ReceiveBody) } + + pub async fn tenant_synthetic_size( + &self, + tenant_shard_id: TenantShardId, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{}/synthetic_size", + self.mgmt_api_endpoint, tenant_shard_id + ); + self.get(&uri) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/client/src/mgmt_api/util.rs b/pageserver/client/src/mgmt_api/util.rs index 048a3bb7cd..bd85506d10 100644 --- a/pageserver/client/src/mgmt_api/util.rs +++ b/pageserver/client/src/mgmt_api/util.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use pageserver_api::shard::TenantShardId; use tokio::task::JoinSet; use utils::id::{TenantId, TenantTimelineId}; @@ -31,7 +32,10 @@ pub async fn get_pageserver_tenant_timelines_unsharded( async move { ( tenant_id, - mgmt_api_client.tenant_details(tenant_id).await.unwrap(), + mgmt_api_client + .tenant_details(TenantShardId::unsharded(tenant_id)) + .await + .unwrap(), ) } });