From d1613ebae3a810e30eeba425e22f1f2db9497d3d Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 15 Jun 2023 15:24:27 +0300 Subject: [PATCH] Add master_broker_endpoint field to tenant config --- control_plane/src/pageserver.rs | 6 +++++ libs/pageserver_api/src/models.rs | 2 ++ pageserver/src/tenant.rs | 41 +++++++++++++++++++++++++------ pageserver/src/tenant/config.rs | 8 ++++++ 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 71faec9f2b..b47c5375c8 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -381,6 +381,9 @@ impl PageServerNode { .transpose() .context("Failed to parse 'gc_feedback' as bool")?, master_region: settings.remove("master_region").map(|x| x.to_string()), + master_broker_endpoint: settings + .remove("master_broker_endpoint") + .map(|x| x.to_string()), }; // If tenant ID was not specified, generate one @@ -481,6 +484,9 @@ impl PageServerNode { .transpose() .context("Failed to parse 'gc_feedback' as bool")?, master_region: settings.remove("master_region").map(|x| x.to_string()), + master_broker_endpoint: settings + .remove("master_broker_endpoint") + .map(|x| x.to_string()), } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index ac7699d9f4..c85297ef3f 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -224,6 +224,7 @@ pub struct TenantConfig { pub evictions_low_residence_duration_metric_threshold: Option, pub gc_feedback: Option, pub master_region: Option, + pub master_broker_endpoint: Option, } #[serde_as] @@ -284,6 +285,7 @@ impl TenantConfigRequest { evictions_low_residence_duration_metric_threshold: None, gc_feedback: None, master_region: None, + master_broker_endpoint: None, }; TenantConfigRequest { tenant_id, config } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0f2b15c2fe..a852caf653 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -666,7 +666,7 @@ impl Tenant { match tenant_clone.attach(&ctx).await { Ok(()) => { info!("attach finished, activating"); - tenant_clone.activate(broker_client, None, &ctx); + tenant_clone.activate(broker_client, None, &ctx)?; } Err(e) => { error!("attach failed, setting tenant state to Broken: {:?}", e); @@ -953,7 +953,7 @@ impl Tenant { Ok(()) => { debug!("load finished, activating"); let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); - tenant_clone.activate(broker_client, background_jobs_can_start, &ctx); + tenant_clone.activate(broker_client, background_jobs_can_start, &ctx)?; } Err(err) => { error!("load failed, setting tenant state to Broken: {err:?}"); @@ -1329,9 +1329,16 @@ impl Tenant { pub async fn create_timeline_replica( &self, timeline_id: TimelineId, - master_broker_endpoint: String, ctx: &RequestContext, ) -> anyhow::Result<()> { + let master_broker_endpoint = self + .tenant_conf + .read() + .unwrap() + .master_broker_endpoint + .as_ref() + .unwrap() + .clone(); let broker_client = storage_broker::connect(master_broker_endpoint, self.conf.broker_keepalive_interval)?; @@ -1443,7 +1450,7 @@ impl Tenant { .expect("timeline found at master"); for (fname, meta) in &index_part.layer_metadata { if !range_eq(&fname.get_key_range(), &(Key::MIN..Key::MAX)) - && fname.get_lsn_range().start < replica_lsn + && fname.get_lsn_range().start < timeline_metadata.ancestor_lsn() { layer_metadata.insert(fname.clone(), LayerFileMetadata::from(meta)); } @@ -1602,7 +1609,7 @@ impl Tenant { } }; - loaded_timeline.activate(broker_client, None, ctx); + loaded_timeline.activate(self.get_broker_channel(broker_client)?, None, ctx); if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { // Wait for the upload of the 'index_part.json` file to finish, so that when we return @@ -2018,6 +2025,21 @@ impl Tenant { self.current_state() == TenantState::Active } + fn get_broker_channel( + &self, + broker_client: BrokerClientChannel, + ) -> anyhow::Result { + let tenent_config_guard = self.tenant_conf.read().unwrap(); + if let Some(master_broker_endpoint) = &tenent_config_guard.master_broker_endpoint { + storage_broker::connect( + master_broker_endpoint.clone(), + self.conf.broker_keepalive_interval, + ) + } else { + Ok(broker_client) + } + } + /// Changes tenant status to active, unless shutdown was already requested. /// /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup @@ -2027,7 +2049,7 @@ impl Tenant { broker_client: BrokerClientChannel, background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, - ) { + ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); let mut activating = false; @@ -2063,7 +2085,11 @@ impl Tenant { let mut activated_timelines = 0; for timeline in timelines_to_activate { - timeline.activate(broker_client.clone(), background_jobs_can_start, ctx); + timeline.activate( + self.get_broker_channel(broker_client.clone())?, + background_jobs_can_start, + ctx, + ); activated_timelines += 1; } @@ -2089,6 +2115,7 @@ impl Tenant { ); }); } + Ok(()) } /// Shutdown the tenant and join all of the spawned tasks. diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index be7d427fb9..a432551616 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -102,6 +102,7 @@ pub struct TenantConf { pub gc_feedback: bool, // Region for master S3 bucket pub master_region: Option, + pub master_broker_endpoint: Option, } /// Same as TenantConf, but this struct preserves the information about @@ -186,6 +187,10 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub master_region: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub master_broker_endpoint: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -255,6 +260,7 @@ impl TenantConfOpt { .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), master_region: self.master_region.clone(), + master_broker_endpoint: self.master_broker_endpoint.clone(), } } } @@ -293,6 +299,7 @@ impl Default for TenantConf { .expect("cannot parse default evictions_low_residence_duration_metric_threshold"), gc_feedback: false, master_region: None, + master_broker_endpoint: None, } } } @@ -389,6 +396,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt { } tenant_conf.gc_feedback = request_data.gc_feedback; tenant_conf.master_region = request_data.master_region.clone(); + tenant_conf.master_broker_endpoint = request_data.master_broker_endpoint.clone(); Ok(tenant_conf) }