mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Fix build problems with create_timeline_replica
This commit is contained in:
@@ -380,6 +380,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
master_region: settings.remove("master_region").map(|x| x.to_string()),
|
||||
};
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
@@ -479,6 +480,7 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
master_region: settings.remove("master_region").map(|x| x.to_string()),
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -342,10 +342,14 @@ impl Debug for S3Config {
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
pub fn in_region(&self, region: String) -> anyhow::Result<RemoteStorageConfig> {
|
||||
if let AwsS3(config) = &self.storage {
|
||||
let mut storage = config.clone();
|
||||
storage.region = region;
|
||||
Ok(RemoteStorageConfig { storage, ..self })
|
||||
let self_clone = self.clone();
|
||||
if let RemoteStorageKind::AwsS3(config) = self_clone.storage {
|
||||
let mut storage = config;
|
||||
storage.bucket_region = region;
|
||||
Ok(RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(storage),
|
||||
..self_clone
|
||||
})
|
||||
} else {
|
||||
bail!("Only AWS3 storage can be used in other region")
|
||||
}
|
||||
|
||||
@@ -142,6 +142,7 @@ fn handle_metadata(
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.replica_lsn(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
@@ -154,6 +155,7 @@ fn handle_metadata(
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.replica_lsn(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
@@ -166,6 +168,7 @@ fn handle_metadata(
|
||||
*latest_gc_cuttoff,
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.replica_lsn(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
//! parent timeline, and the last LSN that has been written to disk.
|
||||
//!
|
||||
|
||||
use crate::repository::Key;
|
||||
use anyhow::{bail, Context};
|
||||
use futures::FutureExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
@@ -60,12 +61,11 @@ use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::metadata::load_metadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
|
||||
use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError;
|
||||
use crate::tenant::storage_layer::DeltaLayer;
|
||||
use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata};
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
MaybeDeletedIndexPart, PersistIndexPartWithDeletedFlagError,
|
||||
};
|
||||
use crate::tenant::storage_layer::{range_eq, DeltaLayer, ImageLayer, Layer, LayerFileName};
|
||||
use crate::InitializationOrder;
|
||||
|
||||
use crate::virtual_file::VirtualFile;
|
||||
@@ -1294,6 +1294,7 @@ impl Tenant {
|
||||
initdb_lsn,
|
||||
initdb_lsn,
|
||||
pg_version,
|
||||
None,
|
||||
);
|
||||
self.prepare_timeline(
|
||||
new_timeline_id,
|
||||
@@ -1328,11 +1329,12 @@ impl Tenant {
|
||||
pub async fn create_timeline_replica(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
master_borker_endpoint: String,
|
||||
master_broker_endpoint: String,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let broker_client =
|
||||
storage_broker::connect(master_broker_endpoint, conf.broker_keepalive_interval).await?;
|
||||
storage_broker::connect(master_broker_endpoint, self.conf.broker_keepalive_interval)?;
|
||||
|
||||
let master_storage = self
|
||||
.master_storage
|
||||
.as_ref()
|
||||
@@ -1343,6 +1345,7 @@ impl Tenant {
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
|
||||
let remote_storage = self
|
||||
.master_storage
|
||||
.as_ref()
|
||||
@@ -1363,7 +1366,6 @@ impl Tenant {
|
||||
|
||||
// Download & parse index parts
|
||||
let mut part_downloads = JoinSet::new();
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
|
||||
for timeline_id in remote_timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
@@ -1409,41 +1411,54 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let (index_part, client) = remote_index_and_client
|
||||
.get(timeline_id)
|
||||
.expect("timeline found at master")?;
|
||||
let (index_part, _client) = remote_index_and_client
|
||||
.get(&timeline_id)
|
||||
.expect("timeline found at master");
|
||||
let mut timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
let layers: Vec<(LayerFileName, IndexLayerMetadata)> = index_part
|
||||
|
||||
// Skip L0 layers because LSN range of L0 and L1 layers is the same and it is not possible
|
||||
// to distinct master and replica layers by LSN.
|
||||
let mut layer_metadata: HashMap<LayerFileName, LayerFileMetadata> = index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.filter(|(fname, meta)| range_eq(&fname.get_key_range(), &(Key::MIN..Key::MAX)))
|
||||
.sort_by(|(fname, meta)| fname.get_lsn_range().start)
|
||||
.filter(|(fname, _meta)| !range_eq(&fname.get_key_range(), &(Key::MIN..Key::MAX)))
|
||||
.map(|(fname, meta)| (fname.clone(), LayerFileMetadata::from(meta)))
|
||||
.collect();
|
||||
let replica_lsn = layers
|
||||
.last()
|
||||
.map_or(Lsn(0), |(fname, meta)| fname.get_lsn_range().last);
|
||||
let mut layer_metadata: HashMap<LayerFileName, LayerFileMetadata> = layers.iter().collect();
|
||||
|
||||
// Let replic_lsn be the largest end LSN
|
||||
let replica_lsn = layer_metadata
|
||||
.keys()
|
||||
.map(|fname| fname.get_lsn_range().start)
|
||||
.max()
|
||||
.unwrap_or(timeline_metadata.ancestor_lsn());
|
||||
|
||||
let old_metadata = timeline_metadata.clone();
|
||||
|
||||
// Now collect layers of ancestor branches. We do not want to reconstruct exact branch
|
||||
// hierarhy at replica, because i this case we need to maintais several timelines.
|
||||
// Instead of it we just add all layers with start LSN < replica_lsn to the current timeline.
|
||||
while let Some(ancestor_id) = timeline_metadata.ancestor_timeline() {
|
||||
let (index_part, client) = remote_index_and_client
|
||||
.get(ancestor_id)
|
||||
.expect("timeline found at master")?;
|
||||
let (index_part, _client) = remote_index_and_client
|
||||
.get(&ancestor_id)
|
||||
.expect("timeline found at master");
|
||||
for (fname, meta) in &index_part.layer_metadata {
|
||||
if fname.get_lasn_range().start < replica_lsn {
|
||||
layer_metadata.insert(fname, meta);
|
||||
if !range_eq(&fname.get_key_range(), &(Key::MIN..Key::MAX))
|
||||
&& fname.get_lsn_range().start < replica_lsn
|
||||
{
|
||||
layer_metadata.insert(fname.clone(), LayerFileMetadata::from(meta));
|
||||
}
|
||||
}
|
||||
timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
}
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
old_metadata.disk_consistent_lsn,
|
||||
old_metadata.prev_record_lsn,
|
||||
old_metadata.disk_consistent_lsn(),
|
||||
old_metadata.prev_record_lsn(),
|
||||
None,
|
||||
Lsn::INVALID,
|
||||
old_metadata.latest_gc_cutoff_lsn,
|
||||
old_metadata.initdb_lsn,
|
||||
old_metadata.pg_version,
|
||||
replica_lsn,
|
||||
old_metadata.latest_gc_cutoff_lsn(),
|
||||
old_metadata.initdb_lsn(),
|
||||
old_metadata.pg_version(),
|
||||
Some(replica_lsn),
|
||||
);
|
||||
|
||||
// Initialize data directories for new timeline
|
||||
@@ -1452,54 +1467,49 @@ impl Tenant {
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
// Save timeline metadata
|
||||
save_metadata(self.conf, timeline_id, self.tenant_id, new_metadata, true)
|
||||
save_metadata(self.conf, timeline_id, self.tenant_id, &new_metadata, true)
|
||||
.context("Failed to create timeline metadata")?;
|
||||
|
||||
// construct new index_part.json with combined list of layers
|
||||
let index_part = IndexPart::new(
|
||||
layer_metadata,
|
||||
old_metadata.disk_consistent_lsn(),
|
||||
new_metadata.to_bytes()?,
|
||||
);
|
||||
|
||||
remote_client.init_upload_queue(&index_part)?;
|
||||
|
||||
let timeline = Timeline::new(
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
new_metadata,
|
||||
&new_metadata,
|
||||
None, // we do not need to restore branches hierarhy at replica
|
||||
timeline_id,
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
remote_client,
|
||||
master_client,
|
||||
Some(remote_client),
|
||||
Some(master_client),
|
||||
Some(replica_lsn),
|
||||
old_metadata.pg_version,
|
||||
old_metadata.pg_version(),
|
||||
None, // no need to calcuate logical size at replica
|
||||
None,
|
||||
);
|
||||
// construct new index_part.json with combined list of layers
|
||||
let index_part = IndexPart::new(
|
||||
layer_metadata,
|
||||
old_metadata.disk_consistent_lsn,
|
||||
new_metadata.to_bytes()?,
|
||||
);
|
||||
|
||||
/* Do we need to perform explicit upload?
|
||||
// Upload this index_part.json to S3 bucket
|
||||
remote_client.upload_index_part(
|
||||
upload_index_part(
|
||||
self.conf,
|
||||
&self.remote_storage,
|
||||
self.telnant_id,
|
||||
&remote_storage,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
&index_part,
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
*/
|
||||
|
||||
// Start background works for this timelinw
|
||||
timeline.activate(broker_client, true);
|
||||
// Start background works for this timeline
|
||||
timeline.activate(broker_client, None, ctx);
|
||||
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2333,12 +2343,12 @@ fn tree_sort_timelines(
|
||||
|
||||
impl Tenant {
|
||||
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
|
||||
*self.tenant_conf.read().unwrap()
|
||||
self.tenant_conf.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn effective_config(&self) -> TenantConf {
|
||||
self.tenant_specific_overrides()
|
||||
.merge(self.conf.default_tenant_conf)
|
||||
.merge(self.conf.default_tenant_conf.clone())
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_distance(&self) -> u64 {
|
||||
@@ -2468,7 +2478,7 @@ impl Tenant {
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
remote_client,
|
||||
master_client,
|
||||
new_metadata.replica_lsn,
|
||||
new_metadata.replica_lsn(),
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned(),
|
||||
@@ -2485,10 +2495,11 @@ impl Tenant {
|
||||
) -> Tenant {
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
let master_storage = if let Some(remote_storage_config) = conf.remote_storage_config {
|
||||
let master_storage = if let Some(remote_storage_config) = &conf.remote_storage_config {
|
||||
if let Some(region) = &tenant_conf.master_region {
|
||||
let master_storage_config = remote_storage_config.in_region(region.clone());
|
||||
Some(GenericRemoteStorage::from_config(master_storage_config)?)
|
||||
let master_storage_config =
|
||||
remote_storage_config.in_region(region.clone()).unwrap();
|
||||
Some(GenericRemoteStorage::from_config(&master_storage_config).unwrap())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -2956,6 +2967,7 @@ impl Tenant {
|
||||
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
src_timeline.initdb_lsn,
|
||||
src_timeline.pg_version,
|
||||
None, // no branches at replica
|
||||
);
|
||||
|
||||
let new_timeline = {
|
||||
@@ -3043,6 +3055,7 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
None,
|
||||
);
|
||||
let raw_timeline =
|
||||
self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?;
|
||||
|
||||
@@ -45,7 +45,7 @@ pub mod defaults {
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TenantConf {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
@@ -106,7 +106,7 @@ pub struct TenantConf {
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
/// which parameters are set and which are not.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
@@ -254,7 +254,7 @@ impl TenantConfOpt {
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
|
||||
master_region: self.master_region,
|
||||
master_region: self.master_region.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -388,7 +388,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
);
|
||||
}
|
||||
tenant_conf.gc_feedback = request_data.gc_feedback;
|
||||
tenant_conf.master_region = request_data.master_region;
|
||||
tenant_conf.master_region = request_data.master_region.clone();
|
||||
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
|
||||
@@ -25,9 +25,6 @@ use crate::virtual_file::VirtualFile;
|
||||
/// Use special format number to enable backward compatibility.
|
||||
const METADATA_FORMAT_VERSION: u16 = 5;
|
||||
|
||||
/// Previous supported format versions.
|
||||
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
|
||||
|
||||
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
|
||||
///
|
||||
/// This is the same assumption that PostgreSQL makes with the control file,
|
||||
@@ -263,6 +260,10 @@ impl TimelineMetadata {
|
||||
pub fn pg_version(&self) -> u32 {
|
||||
self.body.pg_version
|
||||
}
|
||||
|
||||
pub fn replica_lsn(&self) -> Option<Lsn> {
|
||||
self.body.replica_lsn
|
||||
}
|
||||
}
|
||||
|
||||
/// Save timeline metadata to file
|
||||
@@ -366,7 +367,7 @@ mod tests {
|
||||
hdr: TimelineMetadataHeader {
|
||||
checksum: 0,
|
||||
size: 0,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION,
|
||||
format_version: 3,
|
||||
},
|
||||
body: TimelineMetadataBodyV1 {
|
||||
disk_consistent_lsn: Lsn(0x200),
|
||||
|
||||
@@ -348,7 +348,7 @@ pub async fn set_new_tenant_config(
|
||||
Tenant::persist_tenant_config(
|
||||
&tenant.tenant_id(),
|
||||
&tenant_config_path,
|
||||
new_tenant_conf,
|
||||
new_tenant_conf.clone(),
|
||||
false,
|
||||
)
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
@@ -215,16 +215,16 @@ impl LayerFileName {
|
||||
Self::Delta(fname) => fname.to_string(),
|
||||
}
|
||||
}
|
||||
pub fn lsn_range(&self) -> Range<Lsn> {
|
||||
pub fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
match self {
|
||||
Self::Image(fname) => fname.lsn..fname.lsn + 1,
|
||||
Self::Delta(fname) => fname.lsn_range,
|
||||
Self::Delta(fname) => fname.lsn_range.clone(),
|
||||
}
|
||||
}
|
||||
pub fn key_range(&self) -> Range<Key> {
|
||||
pub fn get_key_range(&self) -> Range<Key> {
|
||||
match self {
|
||||
Self::Image(fname) => fname.key_range,
|
||||
Self::Delta(fname) => fname.key_range,
|
||||
Self::Image(fname) => fname.key_range.clone(),
|
||||
Self::Delta(fname) => fname.key_range.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2954,6 +2954,7 @@ impl Timeline {
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
self.initdb_lsn,
|
||||
self.pg_version,
|
||||
self.replica_lsn,
|
||||
);
|
||||
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
|
||||
Reference in New Issue
Block a user