Add master_broker_endpoint field to tenant config

This commit is contained in:
Konstantin Knizhnik
2023-06-15 15:24:27 +03:00
parent 07c8f70a3b
commit d1613ebae3
4 changed files with 50 additions and 7 deletions

View File

@@ -381,6 +381,9 @@ impl PageServerNode {
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
master_region: settings.remove("master_region").map(|x| x.to_string()),
master_broker_endpoint: settings
.remove("master_broker_endpoint")
.map(|x| x.to_string()),
};
// If tenant ID was not specified, generate one
@@ -481,6 +484,9 @@ impl PageServerNode {
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
master_region: settings.remove("master_region").map(|x| x.to_string()),
master_broker_endpoint: settings
.remove("master_broker_endpoint")
.map(|x| x.to_string()),
}
};

View File

@@ -224,6 +224,7 @@ pub struct TenantConfig {
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
pub master_region: Option<String>,
pub master_broker_endpoint: Option<String>,
}
#[serde_as]
@@ -284,6 +285,7 @@ impl TenantConfigRequest {
evictions_low_residence_duration_metric_threshold: None,
gc_feedback: None,
master_region: None,
master_broker_endpoint: None,
};
TenantConfigRequest { tenant_id, config }
}

View File

@@ -666,7 +666,7 @@ impl Tenant {
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
tenant_clone.activate(broker_client, None, &ctx)?;
}
Err(e) => {
error!("attach failed, setting tenant state to Broken: {:?}", e);
@@ -953,7 +953,7 @@ impl Tenant {
Ok(()) => {
debug!("load finished, activating");
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx)?;
}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
@@ -1329,9 +1329,16 @@ impl Tenant {
pub async fn create_timeline_replica(
&self,
timeline_id: TimelineId,
master_broker_endpoint: String,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let master_broker_endpoint = self
.tenant_conf
.read()
.unwrap()
.master_broker_endpoint
.as_ref()
.unwrap()
.clone();
let broker_client =
storage_broker::connect(master_broker_endpoint, self.conf.broker_keepalive_interval)?;
@@ -1443,7 +1450,7 @@ impl Tenant {
.expect("timeline found at master");
for (fname, meta) in &index_part.layer_metadata {
if !range_eq(&fname.get_key_range(), &(Key::MIN..Key::MAX))
&& fname.get_lsn_range().start < replica_lsn
&& fname.get_lsn_range().start < timeline_metadata.ancestor_lsn()
{
layer_metadata.insert(fname.clone(), LayerFileMetadata::from(meta));
}
@@ -1602,7 +1609,7 @@ impl Tenant {
}
};
loaded_timeline.activate(broker_client, None, ctx);
loaded_timeline.activate(self.get_broker_channel(broker_client)?, None, ctx);
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
@@ -2018,6 +2025,21 @@ impl Tenant {
self.current_state() == TenantState::Active
}
fn get_broker_channel(
&self,
broker_client: BrokerClientChannel,
) -> anyhow::Result<BrokerClientChannel> {
let tenent_config_guard = self.tenant_conf.read().unwrap();
if let Some(master_broker_endpoint) = &tenent_config_guard.master_broker_endpoint {
storage_broker::connect(
master_broker_endpoint.clone(),
self.conf.broker_keepalive_interval,
)
} else {
Ok(broker_client)
}
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
@@ -2027,7 +2049,7 @@ impl Tenant {
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut activating = false;
@@ -2063,7 +2085,11 @@ impl Tenant {
let mut activated_timelines = 0;
for timeline in timelines_to_activate {
timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
timeline.activate(
self.get_broker_channel(broker_client.clone())?,
background_jobs_can_start,
ctx,
);
activated_timelines += 1;
}
@@ -2089,6 +2115,7 @@ impl Tenant {
);
});
}
Ok(())
}
/// Shutdown the tenant and join all of the spawned tasks.

View File

@@ -102,6 +102,7 @@ pub struct TenantConf {
pub gc_feedback: bool,
// Region for master S3 bucket
pub master_region: Option<String>,
pub master_broker_endpoint: Option<String>,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -186,6 +187,10 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub master_region: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub master_broker_endpoint: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -255,6 +260,7 @@ impl TenantConfOpt {
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
master_region: self.master_region.clone(),
master_broker_endpoint: self.master_broker_endpoint.clone(),
}
}
}
@@ -293,6 +299,7 @@ impl Default for TenantConf {
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false,
master_region: None,
master_broker_endpoint: None,
}
}
}
@@ -389,6 +396,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
}
tenant_conf.gc_feedback = request_data.gc_feedback;
tenant_conf.master_region = request_data.master_region.clone();
tenant_conf.master_broker_endpoint = request_data.master_broker_endpoint.clone();
Ok(tenant_conf)
}