mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Create replica timeline in another region
This commit is contained in:
@@ -223,6 +223,7 @@ pub struct TenantConfig {
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub gc_feedback: Option<bool>,
|
||||
pub master_region: Option<String>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -282,6 +283,7 @@ impl TenantConfigRequest {
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: None,
|
||||
gc_feedback: None,
|
||||
master_region: None,
|
||||
};
|
||||
TenantConfigRequest { tenant_id, config }
|
||||
}
|
||||
|
||||
@@ -341,6 +341,16 @@ impl Debug for S3Config {
|
||||
}
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
pub fn in_region(&self, region: String) -> anyhow::Result<RemoteStorageConfig> {
|
||||
if let AwsS3(config) = &self.storage {
|
||||
let mut storage = config.clone();
|
||||
storage.region = region;
|
||||
Ok(RemoteStorageConfig { storage, ..self })
|
||||
} else {
|
||||
bail!("Only AWS3 storage can be used in other region")
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let local_path = toml.get("local_path");
|
||||
let bucket_name = toml.get("bucket_name");
|
||||
|
||||
@@ -153,6 +153,9 @@ pub struct Tenant {
|
||||
// provides access to timeline data sitting in the remote storage
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
|
||||
// for cross-region replication: provide access to master S3 bucket
|
||||
master_storage: Option<GenericRemoteStorage>,
|
||||
|
||||
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
|
||||
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
@@ -1322,6 +1325,184 @@ impl Tenant {
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
pub async fn create_timeline_replica(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
master_borker_endpoint: String,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let broker_client =
|
||||
storage_broker::connect(master_broker_endpoint, conf.broker_keepalive_interval).await?;
|
||||
let master_storage = self
|
||||
.master_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("remote storage not specigied"))?;
|
||||
let master_client = RemoteTimelineClient::new(
|
||||
master_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
let remote_storage = self
|
||||
.master_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("remote storage not specigied"))?;
|
||||
let remote_client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
|
||||
let remote_timeline_ids = remote_timeline_client::list_remote_timelines(
|
||||
master_storage,
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Download & parse index parts
|
||||
let mut part_downloads = JoinSet::new();
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
|
||||
for timeline_id in remote_timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
master_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
part_downloads.spawn(
|
||||
async move {
|
||||
debug!("starting index part download");
|
||||
|
||||
let index_part = client
|
||||
.download_index_file()
|
||||
.await
|
||||
.context("download index file")?;
|
||||
|
||||
debug!("finished index part download");
|
||||
|
||||
Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
|
||||
}
|
||||
.map(move |res| {
|
||||
res.with_context(|| format!("download index part for timeline {timeline_id}"))
|
||||
})
|
||||
.instrument(info_span!("download_index_part", timeline=%timeline_id)),
|
||||
);
|
||||
}
|
||||
// Wait for all the download tasks to complete & collect results.
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
let (timeline_id, client, index_part) = result?;
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(_) => {
|
||||
info!("timeline {} is deleted, skipping", timeline_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (index_part, client) = remote_index_and_client
|
||||
.get(timeline_id)
|
||||
.expect("timeline found at master")?;
|
||||
let mut timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
let layers: Vec<(LayerFileName, IndexLayerMetadata)> = index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.filter(|(fname, meta)| range_eq(&fname.get_key_range(), &(Key::MIN..Key::MAX)))
|
||||
.sort_by(|(fname, meta)| fname.get_lsn_range().start)
|
||||
.collect();
|
||||
let replica_lsn = layers
|
||||
.last()
|
||||
.map_or(Lsn(0), |(fname, meta)| fname.get_lsn_range().last);
|
||||
let mut layer_metadata: HashMap<LayerFileName, LayerFileMetadata> = layers.iter().collect();
|
||||
let old_metadata = timeline_metadata.clone();
|
||||
while let Some(ancestor_id) = timeline_metadata.ancestor_timeline() {
|
||||
let (index_part, client) = remote_index_and_client
|
||||
.get(ancestor_id)
|
||||
.expect("timeline found at master")?;
|
||||
for (fname, meta) in &index_part.layer_metadata {
|
||||
if fname.get_lasn_range().start < replica_lsn {
|
||||
layer_metadata.insert(fname, meta);
|
||||
}
|
||||
}
|
||||
timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
}
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
old_metadata.disk_consistent_lsn,
|
||||
old_metadata.prev_record_lsn,
|
||||
None,
|
||||
Lsn::INVALID,
|
||||
old_metadata.latest_gc_cutoff_lsn,
|
||||
old_metadata.initdb_lsn,
|
||||
old_metadata.pg_version,
|
||||
replica_lsn,
|
||||
);
|
||||
|
||||
// Initialize data directories for new timeline
|
||||
tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id))
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
// Save timeline metadata
|
||||
save_metadata(self.conf, timeline_id, self.tenant_id, new_metadata, true)
|
||||
.context("Failed to create timeline metadata")?;
|
||||
|
||||
let timeline = Timeline::new(
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
new_metadata,
|
||||
None, // we do not need to restore branches hierarhy at replica
|
||||
timeline_id,
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
remote_client,
|
||||
master_client,
|
||||
Some(replica_lsn),
|
||||
old_metadata.pg_version,
|
||||
None, // no need to calcuate logical size at replica
|
||||
None,
|
||||
);
|
||||
// construct new index_part.json with combined list of layers
|
||||
let index_part = IndexPart::new(
|
||||
layer_metadata,
|
||||
old_metadata.disk_consistent_lsn,
|
||||
new_metadata.to_bytes()?,
|
||||
);
|
||||
// Upload this index_part.json to S3 bucket
|
||||
remote_client.upload_index_part(
|
||||
self.conf,
|
||||
&self.remote_storage,
|
||||
self.telnant_id,
|
||||
timeline_id,
|
||||
&index_part,
|
||||
)?;
|
||||
|
||||
// Start background works for this timelinw
|
||||
timeline.activate(broker_client, true);
|
||||
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new timeline.
|
||||
///
|
||||
/// Returns the new timeline ID and reference to its Timeline object.
|
||||
@@ -2266,6 +2447,16 @@ impl Tenant {
|
||||
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
|
||||
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
|
||||
|
||||
let master_client = if let Some(master_storage) = &self.master_storage {
|
||||
Some(RemoteTimelineClient::new(
|
||||
master_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
new_timeline_id,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let pg_version = new_metadata.pg_version();
|
||||
Ok(Timeline::new(
|
||||
self.conf,
|
||||
@@ -2276,8 +2467,8 @@ impl Tenant {
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
remote_client,
|
||||
None,
|
||||
None,
|
||||
master_client,
|
||||
new_metadata.replica_lsn,
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned(),
|
||||
@@ -2294,6 +2485,17 @@ impl Tenant {
|
||||
) -> Tenant {
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
let master_storage = if let Some(remote_storage_config) = conf.remote_storage_config {
|
||||
if let Some(region) = &tenant_conf.master_region {
|
||||
let master_storage_config = remote_storage_config.in_region(region.clone());
|
||||
Some(GenericRemoteStorage::from_config(master_storage_config)?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
let tid = tenant_id.to_string();
|
||||
@@ -2332,6 +2534,7 @@ impl Tenant {
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
master_storage,
|
||||
state,
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
|
||||
@@ -100,6 +100,8 @@ pub struct TenantConf {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
pub gc_feedback: bool,
|
||||
// Region for master S3 bucket
|
||||
pub master_region: Option<String>,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -180,6 +182,10 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub gc_feedback: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub master_region: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -248,6 +254,7 @@ impl TenantConfOpt {
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
|
||||
master_region: self.master_region,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -285,6 +292,7 @@ impl Default for TenantConf {
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
gc_feedback: false,
|
||||
master_region: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -380,6 +388,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
);
|
||||
}
|
||||
tenant_conf.gc_feedback = request_data.gc_feedback;
|
||||
tenant_conf.master_region = request_data.master_region;
|
||||
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use crate::config::PageServerConf;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
/// Use special format number to enable backward compatibility.
|
||||
const METADATA_FORMAT_VERSION: u16 = 4;
|
||||
const METADATA_FORMAT_VERSION: u16 = 5;
|
||||
|
||||
/// Previous supported format versions.
|
||||
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
|
||||
@@ -40,7 +40,7 @@ const METADATA_MAX_SIZE: usize = 512;
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct TimelineMetadata {
|
||||
hdr: TimelineMetadataHeader,
|
||||
body: TimelineMetadataBodyV2,
|
||||
body: TimelineMetadataBodyV3,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -72,6 +72,28 @@ struct TimelineMetadataBodyV2 {
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: Lsn,
|
||||
// This is only set if we know it. We track it in memory when the page
|
||||
// server is running, but we only track the value corresponding to
|
||||
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
|
||||
// lot. We only store it in the metadata file when we flush *all* the
|
||||
// in-memory data so that 'last_record_lsn' is the same as
|
||||
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
|
||||
// soon as we reprocess at least one record, we will have a valid
|
||||
// 'prev_record_lsn' value in memory again. This is only really needed when
|
||||
// doing a clean shutdown, so that there is no more WAL beyond
|
||||
// 'disk_consistent_lsn'
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<TimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
replica_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TimelineMetadataBodyV1 {
|
||||
disk_consistent_lsn: Lsn,
|
||||
@@ -101,6 +123,7 @@ impl TimelineMetadata {
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
replica_lsn: Option<Lsn>,
|
||||
) -> Self {
|
||||
Self {
|
||||
hdr: TimelineMetadataHeader {
|
||||
@@ -108,7 +131,7 @@ impl TimelineMetadata {
|
||||
size: 0,
|
||||
format_version: METADATA_FORMAT_VERSION,
|
||||
},
|
||||
body: TimelineMetadataBodyV2 {
|
||||
body: TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
ancestor_timeline,
|
||||
@@ -116,35 +139,48 @@ impl TimelineMetadata {
|
||||
latest_gc_cutoff_lsn,
|
||||
initdb_lsn,
|
||||
pg_version,
|
||||
replica_lsn,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn upgrade_timeline_metadata(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
|
||||
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
|
||||
|
||||
// backward compatible only up to this version
|
||||
ensure!(
|
||||
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
|
||||
"unsupported metadata format version {}",
|
||||
hdr.format_version
|
||||
);
|
||||
|
||||
let metadata_size = hdr.size as usize;
|
||||
|
||||
let body: TimelineMetadataBodyV1 =
|
||||
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
let body = match hdr.format_version {
|
||||
3 => {
|
||||
let body: TimelineMetadataBodyV1 =
|
||||
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
|
||||
let body = TimelineMetadataBodyV2 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: 14, // All timelines created before this version had pg_version 14
|
||||
TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: 14, // All timelines created before this version had pg_version 14
|
||||
replica_lsn: None,
|
||||
}
|
||||
}
|
||||
4 => {
|
||||
let body: TimelineMetadataBodyV2 =
|
||||
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
|
||||
TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: body.pg_version, // All timelines created before this version had pg_version 14
|
||||
replica_lsn: None,
|
||||
}
|
||||
}
|
||||
_ => bail!("unsupported metadata format version {}", hdr.format_version),
|
||||
};
|
||||
|
||||
hdr.format_version = METADATA_FORMAT_VERSION;
|
||||
|
||||
Ok(Self { hdr, body })
|
||||
@@ -174,7 +210,7 @@ impl TimelineMetadata {
|
||||
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
|
||||
} else {
|
||||
let body =
|
||||
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
TimelineMetadataBodyV3::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
ensure!(
|
||||
body.disk_consistent_lsn.is_aligned(),
|
||||
"disk_consistent_lsn is not aligned"
|
||||
|
||||
@@ -12,7 +12,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use super::index::LayerFileMetadata;
|
||||
|
||||
/// Serializes and uploads the given index part data to the remote storage.
|
||||
pub(super) async fn upload_index_part<'a>(
|
||||
pub async fn upload_index_part<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -215,6 +215,18 @@ impl LayerFileName {
|
||||
Self::Delta(fname) => fname.to_string(),
|
||||
}
|
||||
}
|
||||
pub fn lsn_range(&self) -> Range<Lsn> {
|
||||
match self {
|
||||
Self::Image(fname) => fname.lsn..fname.lsn + 1,
|
||||
Self::Delta(fname) => fname.lsn_range,
|
||||
}
|
||||
}
|
||||
pub fn key_range(&self) -> Range<Key> {
|
||||
match self {
|
||||
Self::Image(fname) => fname.key_range,
|
||||
Self::Delta(fname) => fname.key_range,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerFileName {
|
||||
|
||||
Reference in New Issue
Block a user