diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index df5f5896a1..ac7699d9f4 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -223,6 +223,7 @@ pub struct TenantConfig { pub min_resident_size_override: Option, pub evictions_low_residence_duration_metric_threshold: Option, pub gc_feedback: Option, + pub master_region: Option, } #[serde_as] @@ -282,6 +283,7 @@ impl TenantConfigRequest { min_resident_size_override: None, evictions_low_residence_duration_metric_threshold: None, gc_feedback: None, + master_region: None, }; TenantConfigRequest { tenant_id, config } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..b49c01179f 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -341,6 +341,16 @@ impl Debug for S3Config { } impl RemoteStorageConfig { + pub fn in_region(&self, region: String) -> anyhow::Result { + if let AwsS3(config) = &self.storage { + let mut storage = config.clone(); + storage.region = region; + Ok(RemoteStorageConfig { storage, ..self }) + } else { + bail!("Only AWS3 storage can be used in other region") + } + } + pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result> { let local_path = toml.get("local_path"); let bucket_name = toml.get("bucket_name"); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8fd56941c1..6f254713ac 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -153,6 +153,9 @@ pub struct Tenant { // provides access to timeline data sitting in the remote storage remote_storage: Option, + // for cross-region replication: provide access to master S3 bucket + master_storage: Option, + /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`]. cached_logical_sizes: tokio::sync::Mutex>, cached_synthetic_tenant_size: Arc, @@ -1322,6 +1325,184 @@ impl Tenant { Ok(tl) } + pub async fn create_timeline_replica( + &self, + timeline_id: TimelineId, + master_borker_endpoint: String, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let broker_client = + storage_broker::connect(master_broker_endpoint, conf.broker_keepalive_interval).await?; + let master_storage = self + .master_storage + .as_ref() + .ok_or_else(|| anyhow::anyhow!("remote storage not specigied"))?; + let master_client = RemoteTimelineClient::new( + master_storage.clone(), + self.conf, + self.tenant_id, + timeline_id, + ); + let remote_storage = self + .master_storage + .as_ref() + .ok_or_else(|| anyhow::anyhow!("remote storage not specigied"))?; + let remote_client = RemoteTimelineClient::new( + remote_storage.clone(), + self.conf, + self.tenant_id, + timeline_id, + ); + + let remote_timeline_ids = remote_timeline_client::list_remote_timelines( + master_storage, + self.conf, + self.tenant_id, + ) + .await?; + + // Download & parse index parts + let mut part_downloads = JoinSet::new(); + let mut remote_index_and_client = HashMap::new(); + + for timeline_id in remote_timeline_ids { + let client = RemoteTimelineClient::new( + master_storage.clone(), + self.conf, + self.tenant_id, + timeline_id, + ); + part_downloads.spawn( + async move { + debug!("starting index part download"); + + let index_part = client + .download_index_file() + .await + .context("download index file")?; + + debug!("finished index part download"); + + Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part)) + } + .map(move |res| { + res.with_context(|| format!("download index part for timeline {timeline_id}")) + }) + .instrument(info_span!("download_index_part", timeline=%timeline_id)), + ); + } + // Wait for all the download tasks to complete & collect results. + let mut remote_index_and_client = HashMap::new(); + while let Some(result) = part_downloads.join_next().await { + // NB: we already added timeline_id as context to the error + let result: Result<_, anyhow::Error> = result.context("joinset task join")?; + let (timeline_id, client, index_part) = result?; + debug!("successfully downloaded index part for timeline {timeline_id}"); + match index_part { + MaybeDeletedIndexPart::IndexPart(index_part) => { + remote_index_and_client.insert(timeline_id, (index_part, client)); + } + MaybeDeletedIndexPart::Deleted(_) => { + info!("timeline {} is deleted, skipping", timeline_id); + continue; + } + } + } + + let (index_part, client) = remote_index_and_client + .get(timeline_id) + .expect("timeline found at master")?; + let mut timeline_metadata = index_part.parse_metadata().context("parse_metadata")?; + let layers: Vec<(LayerFileName, IndexLayerMetadata)> = index_part + .layer_metadata + .iter() + .filter(|(fname, meta)| range_eq(&fname.get_key_range(), &(Key::MIN..Key::MAX))) + .sort_by(|(fname, meta)| fname.get_lsn_range().start) + .collect(); + let replica_lsn = layers + .last() + .map_or(Lsn(0), |(fname, meta)| fname.get_lsn_range().last); + let mut layer_metadata: HashMap = layers.iter().collect(); + let old_metadata = timeline_metadata.clone(); + while let Some(ancestor_id) = timeline_metadata.ancestor_timeline() { + let (index_part, client) = remote_index_and_client + .get(ancestor_id) + .expect("timeline found at master")?; + for (fname, meta) in &index_part.layer_metadata { + if fname.get_lasn_range().start < replica_lsn { + layer_metadata.insert(fname, meta); + } + } + timeline_metadata = index_part.parse_metadata().context("parse_metadata")?; + } + let new_metadata = TimelineMetadata::new( + old_metadata.disk_consistent_lsn, + old_metadata.prev_record_lsn, + None, + Lsn::INVALID, + old_metadata.latest_gc_cutoff_lsn, + old_metadata.initdb_lsn, + old_metadata.pg_version, + replica_lsn, + ); + + // Initialize data directories for new timeline + tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id)) + .await + .context("Failed to create new timeline directory")?; + + // Save timeline metadata + save_metadata(self.conf, timeline_id, self.tenant_id, new_metadata, true) + .context("Failed to create timeline metadata")?; + + let timeline = Timeline::new( + self.conf, + Arc::clone(&self.tenant_conf), + new_metadata, + None, // we do not need to restore branches hierarhy at replica + timeline_id, + self.tenant_id, + Arc::clone(&self.walredo_mgr), + remote_client, + master_client, + Some(replica_lsn), + old_metadata.pg_version, + None, // no need to calcuate logical size at replica + None, + ); + // construct new index_part.json with combined list of layers + let index_part = IndexPart::new( + layer_metadata, + old_metadata.disk_consistent_lsn, + new_metadata.to_bytes()?, + ); + // Upload this index_part.json to S3 bucket + remote_client.upload_index_part( + self.conf, + &self.remote_storage, + self.telnant_id, + timeline_id, + &index_part, + )?; + + // Start background works for this timelinw + timeline.activate(broker_client, true); + + // Wait for uploads to complete, so that when we return Ok, the timeline + // is known to be durable on remote storage. Just like we do at the end of + // this function, after we have created the timeline ourselves. + // + // We only really care that the initial version of `index_part.json` has + // been uploaded. That's enough to remember that the timeline + // exists. However, there is no function to wait specifically for that so + // we just wait for all in-progress uploads to finish. + remote_client + .wait_completion() + .await + .context("wait for timeline uploads to complete")?; + Ok(()) + } + /// Create a new timeline. /// /// Returns the new timeline ID and reference to its Timeline object. @@ -2266,6 +2447,16 @@ impl Tenant { let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start); let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt); + let master_client = if let Some(master_storage) = &self.master_storage { + Some(RemoteTimelineClient::new( + master_storage.clone(), + self.conf, + self.tenant_id, + new_timeline_id, + )) + } else { + None + }; let pg_version = new_metadata.pg_version(); Ok(Timeline::new( self.conf, @@ -2276,8 +2467,8 @@ impl Tenant { self.tenant_id, Arc::clone(&self.walredo_mgr), remote_client, - None, - None, + master_client, + new_metadata.replica_lsn, pg_version, initial_logical_size_can_start.cloned(), initial_logical_size_attempt.cloned(), @@ -2294,6 +2485,17 @@ impl Tenant { ) -> Tenant { let (state, mut rx) = watch::channel(state); + let master_storage = if let Some(remote_storage_config) = conf.remote_storage_config { + if let Some(region) = &tenant_conf.master_region { + let master_storage_config = remote_storage_config.in_region(region.clone()); + Some(GenericRemoteStorage::from_config(master_storage_config)?) + } else { + None + } + } else { + None + }; + tokio::spawn(async move { let mut current_state: &'static str = From::from(&*rx.borrow_and_update()); let tid = tenant_id.to_string(); @@ -2332,6 +2534,7 @@ impl Tenant { gc_cs: tokio::sync::Mutex::new(()), walredo_mgr, remote_storage, + master_storage, state, cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 80d153661a..2b18606b85 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -100,6 +100,8 @@ pub struct TenantConf { #[serde(with = "humantime_serde")] pub evictions_low_residence_duration_metric_threshold: Duration, pub gc_feedback: bool, + // Region for master S3 bucket + pub master_region: Option, } /// Same as TenantConf, but this struct preserves the information about @@ -180,6 +182,10 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub gc_feedback: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub master_region: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -248,6 +254,7 @@ impl TenantConfOpt { .evictions_low_residence_duration_metric_threshold .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), + master_region: self.master_region, } } } @@ -285,6 +292,7 @@ impl Default for TenantConf { ) .expect("cannot parse default evictions_low_residence_duration_metric_threshold"), gc_feedback: false, + master_region: None, } } } @@ -380,6 +388,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt { ); } tenant_conf.gc_feedback = request_data.gc_feedback; + tenant_conf.master_region = request_data.master_region; Ok(tenant_conf) } diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 1ea61fa26b..198b87ffc6 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -23,7 +23,7 @@ use crate::config::PageServerConf; use crate::virtual_file::VirtualFile; /// Use special format number to enable backward compatibility. -const METADATA_FORMAT_VERSION: u16 = 4; +const METADATA_FORMAT_VERSION: u16 = 5; /// Previous supported format versions. const METADATA_OLD_FORMAT_VERSION: u16 = 3; @@ -40,7 +40,7 @@ const METADATA_MAX_SIZE: usize = 512; #[derive(Debug, Clone, PartialEq, Eq)] pub struct TimelineMetadata { hdr: TimelineMetadataHeader, - body: TimelineMetadataBodyV2, + body: TimelineMetadataBodyV3, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -72,6 +72,28 @@ struct TimelineMetadataBodyV2 { pg_version: u32, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct TimelineMetadataBodyV3 { + disk_consistent_lsn: Lsn, + // This is only set if we know it. We track it in memory when the page + // server is running, but we only track the value corresponding to + // 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a + // lot. We only store it in the metadata file when we flush *all* the + // in-memory data so that 'last_record_lsn' is the same as + // 'disk_consistent_lsn'. That's OK, because after page server restart, as + // soon as we reprocess at least one record, we will have a valid + // 'prev_record_lsn' value in memory again. This is only really needed when + // doing a clean shutdown, so that there is no more WAL beyond + // 'disk_consistent_lsn' + prev_record_lsn: Option, + ancestor_timeline: Option, + ancestor_lsn: Lsn, + latest_gc_cutoff_lsn: Lsn, + initdb_lsn: Lsn, + pg_version: u32, + replica_lsn: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] struct TimelineMetadataBodyV1 { disk_consistent_lsn: Lsn, @@ -101,6 +123,7 @@ impl TimelineMetadata { latest_gc_cutoff_lsn: Lsn, initdb_lsn: Lsn, pg_version: u32, + replica_lsn: Option, ) -> Self { Self { hdr: TimelineMetadataHeader { @@ -108,7 +131,7 @@ impl TimelineMetadata { size: 0, format_version: METADATA_FORMAT_VERSION, }, - body: TimelineMetadataBodyV2 { + body: TimelineMetadataBodyV3 { disk_consistent_lsn, prev_record_lsn, ancestor_timeline, @@ -116,35 +139,48 @@ impl TimelineMetadata { latest_gc_cutoff_lsn, initdb_lsn, pg_version, + replica_lsn, }, } } fn upgrade_timeline_metadata(metadata_bytes: &[u8]) -> anyhow::Result { let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?; - - // backward compatible only up to this version - ensure!( - hdr.format_version == METADATA_OLD_FORMAT_VERSION, - "unsupported metadata format version {}", - hdr.format_version - ); - let metadata_size = hdr.size as usize; - let body: TimelineMetadataBodyV1 = - TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?; + let body = match hdr.format_version { + 3 => { + let body: TimelineMetadataBodyV1 = + TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?; - let body = TimelineMetadataBodyV2 { - disk_consistent_lsn: body.disk_consistent_lsn, - prev_record_lsn: body.prev_record_lsn, - ancestor_timeline: body.ancestor_timeline, - ancestor_lsn: body.ancestor_lsn, - latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn, - initdb_lsn: body.initdb_lsn, - pg_version: 14, // All timelines created before this version had pg_version 14 + TimelineMetadataBodyV3 { + disk_consistent_lsn: body.disk_consistent_lsn, + prev_record_lsn: body.prev_record_lsn, + ancestor_timeline: body.ancestor_timeline, + ancestor_lsn: body.ancestor_lsn, + latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn, + initdb_lsn: body.initdb_lsn, + pg_version: 14, // All timelines created before this version had pg_version 14 + replica_lsn: None, + } + } + 4 => { + let body: TimelineMetadataBodyV2 = + TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?; + + TimelineMetadataBodyV3 { + disk_consistent_lsn: body.disk_consistent_lsn, + prev_record_lsn: body.prev_record_lsn, + ancestor_timeline: body.ancestor_timeline, + ancestor_lsn: body.ancestor_lsn, + latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn, + initdb_lsn: body.initdb_lsn, + pg_version: body.pg_version, // All timelines created before this version had pg_version 14 + replica_lsn: None, + } + } + _ => bail!("unsupported metadata format version {}", hdr.format_version), }; - hdr.format_version = METADATA_FORMAT_VERSION; Ok(Self { hdr, body }) @@ -174,7 +210,7 @@ impl TimelineMetadata { TimelineMetadata::upgrade_timeline_metadata(metadata_bytes) } else { let body = - TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?; + TimelineMetadataBodyV3::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?; ensure!( body.disk_consistent_lsn.is_aligned(), "disk_consistent_lsn is not aligned" diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index b520bb4b0c..d209559dff 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -12,7 +12,7 @@ use utils::id::{TenantId, TimelineId}; use super::index::LayerFileMetadata; /// Serializes and uploads the given index part data to the remote storage. -pub(super) async fn upload_index_part<'a>( +pub async fn upload_index_part<'a>( conf: &'static PageServerConf, storage: &'a GenericRemoteStorage, tenant_id: TenantId, diff --git a/pageserver/src/tenant/storage_layer/filename.rs b/pageserver/src/tenant/storage_layer/filename.rs index 5dcd54689e..7f48378e16 100644 --- a/pageserver/src/tenant/storage_layer/filename.rs +++ b/pageserver/src/tenant/storage_layer/filename.rs @@ -215,6 +215,18 @@ impl LayerFileName { Self::Delta(fname) => fname.to_string(), } } + pub fn lsn_range(&self) -> Range { + match self { + Self::Image(fname) => fname.lsn..fname.lsn + 1, + Self::Delta(fname) => fname.lsn_range, + } + } + pub fn key_range(&self) -> Range { + match self { + Self::Image(fname) => fname.key_range, + Self::Delta(fname) => fname.key_range, + } + } } impl From for LayerFileName {