Compare commits

...

7 Commits

Author SHA1 Message Date
Konstantin Knizhnik
d55a33a8a2 Do not compact master L0 layers for cross-region replica 2023-06-20 11:09:56 +03:00
Konstantin Knizhnik
3fa72bbf5d Force replcaing of master layers with local layer in cross-region pageserver replica 2023-06-18 14:57:30 +03:00
Konstantin Knizhnik
d1613ebae3 Add master_broker_endpoint field to tenant config 2023-06-15 15:24:27 +03:00
Konstantin Knizhnik
07c8f70a3b Refactopr uploadig of index_part.json in create_timeline_replica 2023-06-15 10:39:30 +03:00
Konstantin Knizhnik
77a73ff36f Fix build problems with create_timeline_replica 2023-06-14 22:14:09 +03:00
Konstantin Knizhnik
9030abb426 Create replica timeline in another region 2023-06-14 09:30:29 +03:00
Konstantin Knizhnik
6e549097e8 Support work of page server with two S3 buckets 2023-06-08 18:16:08 +03:00
11 changed files with 447 additions and 48 deletions

View File

@@ -380,6 +380,10 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
master_region: settings.remove("master_region").map(|x| x.to_string()),
master_broker_endpoint: settings
.remove("master_broker_endpoint")
.map(|x| x.to_string()),
};
// If tenant ID was not specified, generate one
@@ -479,6 +483,10 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
master_region: settings.remove("master_region").map(|x| x.to_string()),
master_broker_endpoint: settings
.remove("master_broker_endpoint")
.map(|x| x.to_string()),
}
};

View File

@@ -223,6 +223,8 @@ pub struct TenantConfig {
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
pub master_region: Option<String>,
pub master_broker_endpoint: Option<String>,
}
#[serde_as]
@@ -282,6 +284,8 @@ impl TenantConfigRequest {
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
gc_feedback: None,
master_region: None,
master_broker_endpoint: None,
};
TenantConfigRequest { tenant_id, config }
}

View File

@@ -341,6 +341,20 @@ impl Debug for S3Config {
}
impl RemoteStorageConfig {
pub fn in_region(&self, region: String) -> anyhow::Result<RemoteStorageConfig> {
let self_clone = self.clone();
if let RemoteStorageKind::AwsS3(config) = self_clone.storage {
let mut storage = config;
storage.bucket_region = region;
Ok(RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(storage),
..self_clone
})
} else {
bail!("Only AWS3 storage can be used in other region")
}
}
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");

View File

@@ -142,6 +142,7 @@ fn handle_metadata(
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
meta.replica_lsn(),
);
update_meta = true;
}
@@ -154,6 +155,7 @@ fn handle_metadata(
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
meta.replica_lsn(),
);
update_meta = true;
}
@@ -166,6 +168,7 @@ fn handle_metadata(
*latest_gc_cuttoff,
meta.initdb_lsn(),
meta.pg_version(),
meta.replica_lsn(),
);
update_meta = true;
}

View File

@@ -60,12 +60,11 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::metadata::load_metadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::tenant::storage_layer::Layer;
use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata};
use crate::tenant::remote_timeline_client::{
MaybeDeletedIndexPart, PersistIndexPartWithDeletedFlagError,
};
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerFileName};
use crate::InitializationOrder;
use crate::virtual_file::VirtualFile;
@@ -153,6 +152,9 @@ pub struct Tenant {
// provides access to timeline data sitting in the remote storage
remote_storage: Option<GenericRemoteStorage>,
// for cross-region replication: provide access to master S3 bucket
master_storage: Option<GenericRemoteStorage>,
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -663,7 +665,7 @@ impl Tenant {
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
tenant_clone.activate(broker_client, None, &ctx)?;
}
Err(e) => {
error!("attach failed, setting tenant state to Broken: {:?}", e);
@@ -950,7 +952,7 @@ impl Tenant {
Ok(()) => {
debug!("load finished, activating");
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx)?;
}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
@@ -1291,6 +1293,7 @@ impl Tenant {
initdb_lsn,
initdb_lsn,
pg_version,
None,
);
self.prepare_timeline(
new_timeline_id,
@@ -1322,6 +1325,218 @@ impl Tenant {
Ok(tl)
}
pub async fn create_timeline_replica(
&self,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// We need to connect to broker in master's region to choose safekeeper to subscribe
let master_broker_endpoint = self
.tenant_conf
.read()
.unwrap()
.master_broker_endpoint
.as_ref()
.unwrap()
.clone();
let broker_client =
storage_broker::connect(master_broker_endpoint, self.conf.broker_keepalive_interval)?;
// Access to S3 bucket in master's region
let master_storage = self
.master_storage
.as_ref()
.ok_or_else(|| anyhow::anyhow!("master storage not specified"))?;
let master_client = RemoteTimelineClient::new(
master_storage.clone(),
self.conf,
self.tenant_id,
timeline_id,
);
// Access to local S3 bucket in this region
let remote_storage = self
.master_storage
.as_ref()
.ok_or_else(|| anyhow::anyhow!("remote storage not specified"))?;
let remote_client = RemoteTimelineClient::new(
remote_storage.clone(),
self.conf,
self.tenant_id,
timeline_id,
);
// Get list of all timelines from master. We actually do not need all - only
// ancestors of the target timeline.
let remote_timeline_ids = remote_timeline_client::list_remote_timelines(
master_storage,
self.conf,
self.tenant_id,
)
.await?;
// Download & parse index parts
let mut part_downloads = JoinSet::new();
for timeline_id in remote_timeline_ids {
let client = RemoteTimelineClient::new(
master_storage.clone(),
self.conf,
self.tenant_id,
timeline_id,
);
part_downloads.spawn(
async move {
debug!("starting index part download");
let index_part = client
.download_index_file()
.await
.context("download index file")?;
debug!("finished index part download");
Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
}
.map(move |res| {
res.with_context(|| format!("download index part for timeline {timeline_id}"))
})
.instrument(info_span!("download_index_part", timeline=%timeline_id)),
);
}
// Wait for all the download tasks to complete & collect results.
let mut remote_index_and_client = HashMap::new();
while let Some(result) = part_downloads.join_next().await {
// NB: we already added timeline_id as context to the error
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
let (timeline_id, client, index_part) = result?;
debug!("successfully downloaded index part for timeline {timeline_id}");
match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => {
remote_index_and_client.insert(timeline_id, (index_part, client));
}
MaybeDeletedIndexPart::Deleted(_) => {
info!("timeline {} is deleted, skipping", timeline_id);
continue;
}
}
}
let (index_part, _client) = remote_index_and_client
.get(&timeline_id)
.expect("timeline found at master");
let mut timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
// Convert IndexLayerMetadata to LayerFileMetadata
let mut layer_metadata: HashMap<LayerFileName, LayerFileMetadata> = index_part
.layer_metadata
.iter()
.map(|(fname, meta)| (fname.clone(), LayerFileMetadata::from(meta)))
.collect();
// Let replic_lsn be the largest end LSN
let replica_lsn = layer_metadata
.keys()
.map(|fname| fname.get_lsn_range().end)
.max()
.unwrap_or(timeline_metadata.ancestor_lsn());
let old_metadata = timeline_metadata.clone();
// Now collect layers of ancestor branches. We do not want to reconstruct exact branch
// hierarhy at replica, because in this case we need to maintain several timelines.
// Instead of it we just collect all layers which may be required for the current timeline.
while let Some(ancestor_id) = timeline_metadata.ancestor_timeline() {
let (index_part, _client) = remote_index_and_client
.get(&ancestor_id)
.expect("timeline found at master");
for (fname, meta) in &index_part.layer_metadata {
if fname.get_lsn_range().start < timeline_metadata.ancestor_lsn() {
layer_metadata.insert(fname.clone(), LayerFileMetadata::from(meta));
}
}
timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
}
let new_metadata = TimelineMetadata::new(
old_metadata.disk_consistent_lsn(),
old_metadata.prev_record_lsn(),
None,
Lsn::INVALID,
old_metadata.latest_gc_cutoff_lsn(),
old_metadata.initdb_lsn(),
old_metadata.pg_version(),
Some(replica_lsn),
);
// Initialize data directories for new timeline
tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id))
.await
.context("Failed to create new timeline directory")?;
// Save timeline metadata
save_metadata(self.conf, timeline_id, self.tenant_id, &new_metadata, true)
.context("Failed to create timeline metadata")?;
// construct new index_part.json with combined list of layers
let index_part = IndexPart::new(
layer_metadata,
old_metadata.disk_consistent_lsn(),
new_metadata.to_bytes()?,
);
remote_client.init_upload_queue(&index_part)?;
let timeline = Timeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
&new_metadata,
None, // we do not need to restore branches hierarhy at replica
timeline_id,
self.tenant_id,
Arc::clone(&self.walredo_mgr),
Some(remote_client),
Some(master_client),
Some(replica_lsn),
old_metadata.pg_version(),
None, // no need to calcuate logical size at replica
None,
);
// Wait completion of index part upload */
timeline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.context("wait for index part upload to complete")?;
/* Do we need to perform explicit upload?
// Upload this index_part.json to S3 bucket
upload_index_part(
self.conf,
&remote_storage,
self.tenant_id,
timeline_id,
&index_part,
)
.await?;
*/
timeline
.create_remote_layers(
&index_part,
HashMap::new(), // no local layers
replica_lsn,
)
.await?;
// Start background works for this timeline
timeline.activate(broker_client, None, ctx);
Ok(())
}
/// Create a new timeline.
///
/// Returns the new timeline ID and reference to its Timeline object.
@@ -1402,7 +1617,7 @@ impl Tenant {
}
};
loaded_timeline.activate(broker_client, None, ctx);
loaded_timeline.activate(self.get_broker_channel(broker_client)?, None, ctx);
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
@@ -1818,6 +2033,21 @@ impl Tenant {
self.current_state() == TenantState::Active
}
fn get_broker_channel(
&self,
broker_client: BrokerClientChannel,
) -> anyhow::Result<BrokerClientChannel> {
let tenent_config_guard = self.tenant_conf.read().unwrap();
if let Some(master_broker_endpoint) = &tenent_config_guard.master_broker_endpoint {
storage_broker::connect(
master_broker_endpoint.clone(),
self.conf.broker_keepalive_interval,
)
} else {
Ok(broker_client)
}
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
@@ -1827,7 +2057,7 @@ impl Tenant {
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut activating = false;
@@ -1863,7 +2093,11 @@ impl Tenant {
let mut activated_timelines = 0;
for timeline in timelines_to_activate {
timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
timeline.activate(
self.get_broker_channel(broker_client.clone())?,
background_jobs_can_start,
ctx,
);
activated_timelines += 1;
}
@@ -1889,6 +2123,7 @@ impl Tenant {
);
});
}
Ok(())
}
/// Shutdown the tenant and join all of the spawned tasks.
@@ -2152,12 +2387,12 @@ fn tree_sort_timelines(
impl Tenant {
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
*self.tenant_conf.read().unwrap()
self.tenant_conf.read().unwrap().clone()
}
pub fn effective_config(&self) -> TenantConf {
self.tenant_specific_overrides()
.merge(self.conf.default_tenant_conf)
.merge(self.conf.default_tenant_conf.clone())
}
pub fn get_checkpoint_distance(&self) -> u64 {
@@ -2266,6 +2501,16 @@ impl Tenant {
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
let master_client = if let Some(master_storage) = &self.master_storage {
Some(RemoteTimelineClient::new(
master_storage.clone(),
self.conf,
self.tenant_id,
new_timeline_id,
))
} else {
None
};
let pg_version = new_metadata.pg_version();
Ok(Timeline::new(
self.conf,
@@ -2276,6 +2521,8 @@ impl Tenant {
self.tenant_id,
Arc::clone(&self.walredo_mgr),
remote_client,
master_client,
new_metadata.replica_lsn(),
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned(),
@@ -2292,6 +2539,18 @@ impl Tenant {
) -> Tenant {
let (state, mut rx) = watch::channel(state);
let master_storage = if let Some(remote_storage_config) = &conf.remote_storage_config {
if let Some(region) = &tenant_conf.master_region {
let master_storage_config =
remote_storage_config.in_region(region.clone()).unwrap();
Some(GenericRemoteStorage::from_config(&master_storage_config).unwrap())
} else {
None
}
} else {
None
};
tokio::spawn(async move {
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
let tid = tenant_id.to_string();
@@ -2330,6 +2589,7 @@ impl Tenant {
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
master_storage,
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
@@ -2751,6 +3011,7 @@ impl Tenant {
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
src_timeline.initdb_lsn,
src_timeline.pg_version,
None, // no branches at replica
);
let new_timeline = {
@@ -2838,6 +3099,7 @@ impl Tenant {
pgdata_lsn,
pgdata_lsn,
pg_version,
None,
);
let raw_timeline =
self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?;

View File

@@ -45,7 +45,7 @@ pub mod defaults {
}
/// Per-tenant configuration options
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
@@ -100,11 +100,14 @@ pub struct TenantConf {
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
pub gc_feedback: bool,
// Region for master S3 bucket
pub master_region: Option<String>,
pub master_broker_endpoint: Option<String>,
}
/// Same as TenantConf, but this struct preserves the information about
/// which parameters are set and which are not.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
@@ -180,6 +183,14 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_feedback: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub master_region: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub master_broker_endpoint: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -248,6 +259,8 @@ impl TenantConfOpt {
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
master_region: self.master_region.clone(),
master_broker_endpoint: self.master_broker_endpoint.clone(),
}
}
}
@@ -285,6 +298,8 @@ impl Default for TenantConf {
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false,
master_region: None,
master_broker_endpoint: None,
}
}
}
@@ -380,6 +395,8 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
);
}
tenant_conf.gc_feedback = request_data.gc_feedback;
tenant_conf.master_region = request_data.master_region.clone();
tenant_conf.master_broker_endpoint = request_data.master_broker_endpoint.clone();
Ok(tenant_conf)
}

View File

@@ -23,10 +23,7 @@ use crate::config::PageServerConf;
use crate::virtual_file::VirtualFile;
/// Use special format number to enable backward compatibility.
const METADATA_FORMAT_VERSION: u16 = 4;
/// Previous supported format versions.
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
const METADATA_FORMAT_VERSION: u16 = 5;
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
///
@@ -40,7 +37,7 @@ const METADATA_MAX_SIZE: usize = 512;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineMetadata {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBodyV2,
body: TimelineMetadataBodyV3,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -72,6 +69,28 @@ struct TimelineMetadataBodyV2 {
pg_version: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV3 {
disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
// lot. We only store it in the metadata file when we flush *all* the
// in-memory data so that 'last_record_lsn' is the same as
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
// soon as we reprocess at least one record, we will have a valid
// 'prev_record_lsn' value in memory again. This is only really needed when
// doing a clean shutdown, so that there is no more WAL beyond
// 'disk_consistent_lsn'
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<TimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
replica_lsn: Option<Lsn>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn,
@@ -101,6 +120,7 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
replica_lsn: Option<Lsn>,
) -> Self {
Self {
hdr: TimelineMetadataHeader {
@@ -108,7 +128,7 @@ impl TimelineMetadata {
size: 0,
format_version: METADATA_FORMAT_VERSION,
},
body: TimelineMetadataBodyV2 {
body: TimelineMetadataBodyV3 {
disk_consistent_lsn,
prev_record_lsn,
ancestor_timeline,
@@ -116,35 +136,48 @@ impl TimelineMetadata {
latest_gc_cutoff_lsn,
initdb_lsn,
pg_version,
replica_lsn,
},
}
}
fn upgrade_timeline_metadata(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
// backward compatible only up to this version
ensure!(
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
"unsupported metadata format version {}",
hdr.format_version
);
let metadata_size = hdr.size as usize;
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = match hdr.format_version {
3 => {
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = TimelineMetadataBodyV2 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
TimelineMetadataBodyV3 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
replica_lsn: None,
}
}
4 => {
let body: TimelineMetadataBodyV2 =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
TimelineMetadataBodyV3 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: body.pg_version, // All timelines created before this version had pg_version 14
replica_lsn: None,
}
}
_ => bail!("unsupported metadata format version {}", hdr.format_version),
};
hdr.format_version = METADATA_FORMAT_VERSION;
Ok(Self { hdr, body })
@@ -174,7 +207,7 @@ impl TimelineMetadata {
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
} else {
let body =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
TimelineMetadataBodyV3::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
ensure!(
body.disk_consistent_lsn.is_aligned(),
"disk_consistent_lsn is not aligned"
@@ -227,6 +260,10 @@ impl TimelineMetadata {
pub fn pg_version(&self) -> u32 {
self.body.pg_version
}
pub fn replica_lsn(&self) -> Option<Lsn> {
self.body.replica_lsn
}
}
/// Save timeline metadata to file
@@ -330,7 +367,7 @@ mod tests {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION,
format_version: 3,
},
body: TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn(0x200),

View File

@@ -348,7 +348,7 @@ pub async fn set_new_tenant_config(
Tenant::persist_tenant_config(
&tenant.tenant_id(),
&tenant_config_path,
new_tenant_conf,
new_tenant_conf.clone(),
false,
)
.map_err(SetNewTenantConfigError::Persist)?;

View File

@@ -12,7 +12,7 @@ use utils::id::{TenantId, TimelineId};
use super::index::LayerFileMetadata;
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<'a>(
pub async fn upload_index_part<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
tenant_id: TenantId,

View File

@@ -215,6 +215,18 @@ impl LayerFileName {
Self::Delta(fname) => fname.to_string(),
}
}
pub fn get_lsn_range(&self) -> Range<Lsn> {
match self {
Self::Image(fname) => fname.lsn..fname.lsn + 1,
Self::Delta(fname) => fname.lsn_range.clone(),
}
}
pub fn get_key_range(&self) -> Range<Key> {
match self {
Self::Image(fname) => fname.key_range.clone(),
Self::Delta(fname) => fname.key_range.clone(),
}
}
}
impl From<ImageFileName> for LayerFileName {

View File

@@ -144,6 +144,16 @@ pub struct Timeline {
/// See [`storage_sync`] module comment for details.
pub remote_client: Option<Arc<RemoteTimelineClient>>,
/// Master remote storage client (for cross-region pageserver replica).
/// All layers created by replica are stored in local region S3 bucket, but
/// pageserver may need to download older layers from master S3 bucket.
pub master_client: Option<Arc<RemoteTimelineClient>>,
/// Remote consistent LSN at which cross-region replica was created.
/// All layers which start ls smaller than this point should be downloaded from master S3 bucket
/// (see master_client).
pub replica_lsn: Option<Lsn>,
// What page versions do we hold in the repository? If we get a
// request > last_record_lsn, we need to wait until we receive all
// the WAL up to the request. The SeqWait provides functions for
@@ -1056,7 +1066,10 @@ impl Timeline {
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
if self
.get_download_source(remote_layer.get_lsn_range().start)
.is_none()
{
return Ok(Some(false));
}
@@ -1370,6 +1383,17 @@ impl Timeline {
}
}
fn get_download_source(&self, layer_start_lsn: Lsn) -> Option<&Arc<RemoteTimelineClient>> {
self.replica_lsn
.map_or(self.remote_client.as_ref(), |replica_lsn| {
if layer_start_lsn < replica_lsn {
self.master_client.as_ref()
} else {
self.remote_client.as_ref()
}
})
}
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory, but not the layer map.
@@ -1383,6 +1407,8 @@ impl Timeline {
tenant_id: TenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
remote_client: Option<RemoteTimelineClient>,
master_client: Option<RemoteTimelineClient>,
replica_lsn: Option<Lsn>,
pg_version: u32,
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
@@ -1418,6 +1444,9 @@ impl Timeline {
remote_client: remote_client.map(Arc::new),
master_client: master_client.map(Arc::new),
replica_lsn,
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: disk_consistent_lsn,
@@ -1705,7 +1734,7 @@ impl Timeline {
Ok(())
}
async fn create_remote_layers(
pub async fn create_remote_layers(
&self,
index_part: &IndexPart,
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
@@ -2925,6 +2954,7 @@ impl Timeline {
*self.latest_gc_cutoff_lsn.read(),
self.initdb_lsn,
self.pg_version,
self.replica_lsn,
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
@@ -3087,7 +3117,12 @@ impl Timeline {
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
max_deltas = max_deltas.max(num_deltas);
if num_deltas >= threshold {
// Create new image layers if there are at least `threshold` delta layers since last image layer...
if num_deltas >= threshold
// ...or it is master layer for cross-region replica: force generation of image layer in this case
// to make replica independent from master.
|| img_lsn <= self.replica_lsn.unwrap_or(Lsn(0))
{
debug!(
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
img_range.start, img_range.end, num_deltas, img_lsn, lsn
@@ -3284,6 +3319,13 @@ impl Timeline {
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
// Do not compact L0 delta from master for cross-regio replica
// because master and replica layers are distinguished by LSN
// and L0 and L1 layers have the same LSN range
if let Some(replica_lsn) = &self.replica_lsn {
level0_deltas.retain(|l| l.get_lsn_range().start >= *replica_lsn);
}
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
@@ -4191,7 +4233,7 @@ impl Timeline {
&format!("download layer {}", remote_layer.short_id()),
false,
async move {
let remote_client = self_clone.remote_client.as_ref().unwrap();
let remote_client = self_clone.get_download_source(remote_layer.get_lsn_range().start).unwrap();
// Does retries + exponential back-off internally.
// When this fails, don't layer further retry attempts here.