mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 12:40:36 +00:00
Compare commits
7 Commits
problame/e
...
cross_regi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d55a33a8a2 | ||
|
|
3fa72bbf5d | ||
|
|
d1613ebae3 | ||
|
|
07c8f70a3b | ||
|
|
77a73ff36f | ||
|
|
9030abb426 | ||
|
|
6e549097e8 |
@@ -380,6 +380,10 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
master_region: settings.remove("master_region").map(|x| x.to_string()),
|
||||
master_broker_endpoint: settings
|
||||
.remove("master_broker_endpoint")
|
||||
.map(|x| x.to_string()),
|
||||
};
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
@@ -479,6 +483,10 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
master_region: settings.remove("master_region").map(|x| x.to_string()),
|
||||
master_broker_endpoint: settings
|
||||
.remove("master_broker_endpoint")
|
||||
.map(|x| x.to_string()),
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -223,6 +223,8 @@ pub struct TenantConfig {
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub gc_feedback: Option<bool>,
|
||||
pub master_region: Option<String>,
|
||||
pub master_broker_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -282,6 +284,8 @@ impl TenantConfigRequest {
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: None,
|
||||
gc_feedback: None,
|
||||
master_region: None,
|
||||
master_broker_endpoint: None,
|
||||
};
|
||||
TenantConfigRequest { tenant_id, config }
|
||||
}
|
||||
|
||||
@@ -341,6 +341,20 @@ impl Debug for S3Config {
|
||||
}
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
pub fn in_region(&self, region: String) -> anyhow::Result<RemoteStorageConfig> {
|
||||
let self_clone = self.clone();
|
||||
if let RemoteStorageKind::AwsS3(config) = self_clone.storage {
|
||||
let mut storage = config;
|
||||
storage.bucket_region = region;
|
||||
Ok(RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(storage),
|
||||
..self_clone
|
||||
})
|
||||
} else {
|
||||
bail!("Only AWS3 storage can be used in other region")
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let local_path = toml.get("local_path");
|
||||
let bucket_name = toml.get("bucket_name");
|
||||
|
||||
@@ -142,6 +142,7 @@ fn handle_metadata(
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.replica_lsn(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
@@ -154,6 +155,7 @@ fn handle_metadata(
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.replica_lsn(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
@@ -166,6 +168,7 @@ fn handle_metadata(
|
||||
*latest_gc_cuttoff,
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
meta.replica_lsn(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
|
||||
@@ -799,12 +799,8 @@ impl PageCache {
|
||||
// a different victim. But if the problem persists, the page cache
|
||||
// could fill up with dirty pages that we cannot evict, and we will
|
||||
// loop retrying the writebacks indefinitely.
|
||||
if cfg!(test) {
|
||||
anyhow::bail!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
} else {
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
}
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1601,6 +1601,9 @@ pub fn create_test_timeline(
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
|
||||
@@ -60,12 +60,11 @@ use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::metadata::load_metadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
|
||||
use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError;
|
||||
use crate::tenant::storage_layer::DeltaLayer;
|
||||
use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata};
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
MaybeDeletedIndexPart, PersistIndexPartWithDeletedFlagError,
|
||||
};
|
||||
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerFileName};
|
||||
use crate::InitializationOrder;
|
||||
|
||||
use crate::virtual_file::VirtualFile;
|
||||
@@ -86,7 +85,6 @@ pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
pub mod manifest;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
@@ -154,6 +152,9 @@ pub struct Tenant {
|
||||
// provides access to timeline data sitting in the remote storage
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
|
||||
// for cross-region replication: provide access to master S3 bucket
|
||||
master_storage: Option<GenericRemoteStorage>,
|
||||
|
||||
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
|
||||
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
@@ -489,7 +490,6 @@ impl std::fmt::Display for WaitToBecomeActiveError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ShutdownError {
|
||||
AlreadyStopping,
|
||||
}
|
||||
@@ -665,7 +665,7 @@ impl Tenant {
|
||||
match tenant_clone.attach(&ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
tenant_clone.activate(broker_client, None, &ctx);
|
||||
tenant_clone.activate(broker_client, None, &ctx)?;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("attach failed, setting tenant state to Broken: {:?}", e);
|
||||
@@ -952,7 +952,7 @@ impl Tenant {
|
||||
Ok(()) => {
|
||||
debug!("load finished, activating");
|
||||
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
|
||||
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx)?;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
@@ -1269,18 +1269,6 @@ impl Tenant {
|
||||
/// This is used to create the initial 'main' timeline during bootstrapping,
|
||||
/// or when importing a new base backup. The caller is expected to load an
|
||||
/// initial image of the datadir to the new timeline after this.
|
||||
///
|
||||
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
|
||||
/// and the timeline will fail to load at a restart.
|
||||
///
|
||||
/// That's why we add an uninit mark file, and wrap it together witht the Timeline
|
||||
/// in-memory object into UninitializedTimeline.
|
||||
/// Once the caller is done setting up the timeline, they should call
|
||||
/// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
|
||||
///
|
||||
/// For tests, use `DatadirModification::init_empty` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a working timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
@@ -1305,6 +1293,7 @@ impl Tenant {
|
||||
initdb_lsn,
|
||||
initdb_lsn,
|
||||
pg_version,
|
||||
None,
|
||||
);
|
||||
self.prepare_timeline(
|
||||
new_timeline_id,
|
||||
@@ -1329,26 +1318,225 @@ impl Tenant {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
|
||||
// Setup minimum keys required for the timeline to be usable.
|
||||
let mut modification = uninit_tl
|
||||
.raw_timeline()
|
||||
.expect("we just created it")
|
||||
.begin_modification(initdb_lsn);
|
||||
modification.init_empty().context("init_empty")?;
|
||||
modification
|
||||
.commit()
|
||||
.context("commit init_empty modification")?;
|
||||
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
// load_layers=false because create_empty_timeline already did that what's necessary (set next_open_layer)
|
||||
// and modification.init_empty() already created layers.
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
pub async fn create_timeline_replica(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// We need to connect to broker in master's region to choose safekeeper to subscribe
|
||||
let master_broker_endpoint = self
|
||||
.tenant_conf
|
||||
.read()
|
||||
.unwrap()
|
||||
.master_broker_endpoint
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.clone();
|
||||
let broker_client =
|
||||
storage_broker::connect(master_broker_endpoint, self.conf.broker_keepalive_interval)?;
|
||||
|
||||
// Access to S3 bucket in master's region
|
||||
let master_storage = self
|
||||
.master_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("master storage not specified"))?;
|
||||
let master_client = RemoteTimelineClient::new(
|
||||
master_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
|
||||
// Access to local S3 bucket in this region
|
||||
let remote_storage = self
|
||||
.master_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("remote storage not specified"))?;
|
||||
let remote_client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
|
||||
// Get list of all timelines from master. We actually do not need all - only
|
||||
// ancestors of the target timeline.
|
||||
let remote_timeline_ids = remote_timeline_client::list_remote_timelines(
|
||||
master_storage,
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Download & parse index parts
|
||||
let mut part_downloads = JoinSet::new();
|
||||
|
||||
for timeline_id in remote_timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
master_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
part_downloads.spawn(
|
||||
async move {
|
||||
debug!("starting index part download");
|
||||
|
||||
let index_part = client
|
||||
.download_index_file()
|
||||
.await
|
||||
.context("download index file")?;
|
||||
|
||||
debug!("finished index part download");
|
||||
|
||||
Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
|
||||
}
|
||||
.map(move |res| {
|
||||
res.with_context(|| format!("download index part for timeline {timeline_id}"))
|
||||
})
|
||||
.instrument(info_span!("download_index_part", timeline=%timeline_id)),
|
||||
);
|
||||
}
|
||||
// Wait for all the download tasks to complete & collect results.
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
let (timeline_id, client, index_part) = result?;
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(_) => {
|
||||
info!("timeline {} is deleted, skipping", timeline_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (index_part, _client) = remote_index_and_client
|
||||
.get(&timeline_id)
|
||||
.expect("timeline found at master");
|
||||
let mut timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
|
||||
// Convert IndexLayerMetadata to LayerFileMetadata
|
||||
let mut layer_metadata: HashMap<LayerFileName, LayerFileMetadata> = index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.map(|(fname, meta)| (fname.clone(), LayerFileMetadata::from(meta)))
|
||||
.collect();
|
||||
|
||||
// Let replic_lsn be the largest end LSN
|
||||
let replica_lsn = layer_metadata
|
||||
.keys()
|
||||
.map(|fname| fname.get_lsn_range().end)
|
||||
.max()
|
||||
.unwrap_or(timeline_metadata.ancestor_lsn());
|
||||
|
||||
let old_metadata = timeline_metadata.clone();
|
||||
|
||||
// Now collect layers of ancestor branches. We do not want to reconstruct exact branch
|
||||
// hierarhy at replica, because in this case we need to maintain several timelines.
|
||||
// Instead of it we just collect all layers which may be required for the current timeline.
|
||||
while let Some(ancestor_id) = timeline_metadata.ancestor_timeline() {
|
||||
let (index_part, _client) = remote_index_and_client
|
||||
.get(&ancestor_id)
|
||||
.expect("timeline found at master");
|
||||
for (fname, meta) in &index_part.layer_metadata {
|
||||
if fname.get_lsn_range().start < timeline_metadata.ancestor_lsn() {
|
||||
layer_metadata.insert(fname.clone(), LayerFileMetadata::from(meta));
|
||||
}
|
||||
}
|
||||
timeline_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
}
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
old_metadata.disk_consistent_lsn(),
|
||||
old_metadata.prev_record_lsn(),
|
||||
None,
|
||||
Lsn::INVALID,
|
||||
old_metadata.latest_gc_cutoff_lsn(),
|
||||
old_metadata.initdb_lsn(),
|
||||
old_metadata.pg_version(),
|
||||
Some(replica_lsn),
|
||||
);
|
||||
|
||||
// Initialize data directories for new timeline
|
||||
tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id))
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
// Save timeline metadata
|
||||
save_metadata(self.conf, timeline_id, self.tenant_id, &new_metadata, true)
|
||||
.context("Failed to create timeline metadata")?;
|
||||
|
||||
// construct new index_part.json with combined list of layers
|
||||
let index_part = IndexPart::new(
|
||||
layer_metadata,
|
||||
old_metadata.disk_consistent_lsn(),
|
||||
new_metadata.to_bytes()?,
|
||||
);
|
||||
|
||||
remote_client.init_upload_queue(&index_part)?;
|
||||
|
||||
let timeline = Timeline::new(
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
&new_metadata,
|
||||
None, // we do not need to restore branches hierarhy at replica
|
||||
timeline_id,
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
Some(remote_client),
|
||||
Some(master_client),
|
||||
Some(replica_lsn),
|
||||
old_metadata.pg_version(),
|
||||
None, // no need to calcuate logical size at replica
|
||||
None,
|
||||
);
|
||||
|
||||
// Wait completion of index part upload */
|
||||
timeline
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for index part upload to complete")?;
|
||||
|
||||
/* Do we need to perform explicit upload?
|
||||
// Upload this index_part.json to S3 bucket
|
||||
upload_index_part(
|
||||
self.conf,
|
||||
&remote_storage,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
&index_part,
|
||||
)
|
||||
.await?;
|
||||
*/
|
||||
|
||||
timeline
|
||||
.create_remote_layers(
|
||||
&index_part,
|
||||
HashMap::new(), // no local layers
|
||||
replica_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Start background works for this timeline
|
||||
timeline.activate(broker_client, None, ctx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new timeline.
|
||||
///
|
||||
/// Returns the new timeline ID and reference to its Timeline object.
|
||||
@@ -1429,7 +1617,7 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
loaded_timeline.activate(self.get_broker_channel(broker_client)?, None, ctx);
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
@@ -1845,6 +2033,21 @@ impl Tenant {
|
||||
self.current_state() == TenantState::Active
|
||||
}
|
||||
|
||||
fn get_broker_channel(
|
||||
&self,
|
||||
broker_client: BrokerClientChannel,
|
||||
) -> anyhow::Result<BrokerClientChannel> {
|
||||
let tenent_config_guard = self.tenant_conf.read().unwrap();
|
||||
if let Some(master_broker_endpoint) = &tenent_config_guard.master_broker_endpoint {
|
||||
storage_broker::connect(
|
||||
master_broker_endpoint.clone(),
|
||||
self.conf.broker_keepalive_interval,
|
||||
)
|
||||
} else {
|
||||
Ok(broker_client)
|
||||
}
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
@@ -1854,7 +2057,7 @@ impl Tenant {
|
||||
broker_client: BrokerClientChannel,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut activating = false;
|
||||
@@ -1890,7 +2093,11 @@ impl Tenant {
|
||||
let mut activated_timelines = 0;
|
||||
|
||||
for timeline in timelines_to_activate {
|
||||
timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
|
||||
timeline.activate(
|
||||
self.get_broker_channel(broker_client.clone())?,
|
||||
background_jobs_can_start,
|
||||
ctx,
|
||||
);
|
||||
activated_timelines += 1;
|
||||
}
|
||||
|
||||
@@ -1916,6 +2123,7 @@ impl Tenant {
|
||||
);
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shutdown the tenant and join all of the spawned tasks.
|
||||
@@ -1927,8 +2135,6 @@ impl Tenant {
|
||||
/// This will attempt to shutdown even if tenant is broken.
|
||||
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
debug!("start");
|
||||
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
//
|
||||
// Since we can only transition into Stopping state after activation is complete,
|
||||
@@ -1975,7 +2181,6 @@ impl Tenant {
|
||||
// this will additionally shutdown and await all timeline tasks.
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
|
||||
|
||||
debug!("complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2182,12 +2387,12 @@ fn tree_sort_timelines(
|
||||
|
||||
impl Tenant {
|
||||
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
|
||||
*self.tenant_conf.read().unwrap()
|
||||
self.tenant_conf.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn effective_config(&self) -> TenantConf {
|
||||
self.tenant_specific_overrides()
|
||||
.merge(self.conf.default_tenant_conf)
|
||||
.merge(self.conf.default_tenant_conf.clone())
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_distance(&self) -> u64 {
|
||||
@@ -2296,6 +2501,16 @@ impl Tenant {
|
||||
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
|
||||
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
|
||||
|
||||
let master_client = if let Some(master_storage) = &self.master_storage {
|
||||
Some(RemoteTimelineClient::new(
|
||||
master_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
new_timeline_id,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let pg_version = new_metadata.pg_version();
|
||||
Ok(Timeline::new(
|
||||
self.conf,
|
||||
@@ -2306,6 +2521,8 @@ impl Tenant {
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
remote_client,
|
||||
master_client,
|
||||
new_metadata.replica_lsn(),
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned(),
|
||||
@@ -2322,6 +2539,18 @@ impl Tenant {
|
||||
) -> Tenant {
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
let master_storage = if let Some(remote_storage_config) = &conf.remote_storage_config {
|
||||
if let Some(region) = &tenant_conf.master_region {
|
||||
let master_storage_config =
|
||||
remote_storage_config.in_region(region.clone()).unwrap();
|
||||
Some(GenericRemoteStorage::from_config(&master_storage_config).unwrap())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
let tid = tenant_id.to_string();
|
||||
@@ -2360,6 +2589,7 @@ impl Tenant {
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
master_storage,
|
||||
state,
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
@@ -2781,6 +3011,7 @@ impl Tenant {
|
||||
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
src_timeline.initdb_lsn,
|
||||
src_timeline.pg_version,
|
||||
None, // no branches at replica
|
||||
);
|
||||
|
||||
let new_timeline = {
|
||||
@@ -2868,6 +3099,7 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
None,
|
||||
);
|
||||
let raw_timeline =
|
||||
self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?;
|
||||
@@ -3452,7 +3684,6 @@ pub mod harness {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: TenantId,
|
||||
tenant: std::sync::Mutex<Option<Arc<Tenant>>>,
|
||||
|
||||
pub lock_guard: (
|
||||
Option<RwLockReadGuard<'a, ()>>,
|
||||
@@ -3512,7 +3743,6 @@ pub mod harness {
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
lock_guard,
|
||||
tenant: std::sync::Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3561,7 +3791,6 @@ pub mod harness {
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
timeline.set_state(TimelineState::Active);
|
||||
}
|
||||
*self.tenant.lock().unwrap() = Some(Arc::clone(&tenant));
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
@@ -3570,32 +3799,6 @@ pub mod harness {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for TenantHarness<'a> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(tenant) = self.tenant.lock().unwrap().take() {
|
||||
// Shutdown with freeze_and_flush so that we don't drop `EphemeralFile` objects in `InMemoryLayer`s.
|
||||
// Without this, we remove the ephemeral files on disk but they remain in pageserver's PageCache.
|
||||
// This causes write-back failures down the line.
|
||||
let tenant = Arc::clone(&tenant);
|
||||
std::thread::Builder::new()
|
||||
.name("TenantHarness::drop thread".to_owned())
|
||||
.spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(tenant.shutdown(true).instrument(
|
||||
info_span!("tenant_harness_drop_shutdown", tenant_id=%tenant.tenant_id),
|
||||
))
|
||||
.unwrap()
|
||||
})
|
||||
.unwrap()
|
||||
.join()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mock WAL redo manager that doesn't do much
|
||||
pub struct TestRedoManager;
|
||||
|
||||
@@ -3644,10 +3847,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_basic")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -3677,12 +3878,12 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_duplicate_timelines() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("no_duplicate_timelines")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
|
||||
.load()
|
||||
.await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
@@ -3709,11 +3910,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_branch() -> anyhow::Result<()> {
|
||||
use std::str::from_utf8;
|
||||
let harness = TenantHarness::create("test_branch")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -3806,12 +4005,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let harness =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -3843,9 +4041,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
|
||||
@@ -3893,10 +4092,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3940,11 +4140,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
|
||||
let harness =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3963,10 +4163,11 @@ mod tests {
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_parent_keeps_data_forever_after_branching")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3999,7 +4200,7 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
|
||||
}
|
||||
|
||||
@@ -4019,7 +4220,7 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
@@ -4056,8 +4257,7 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
drop(tline);
|
||||
drop(tenant);
|
||||
|
||||
@@ -4094,10 +4294,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_images")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -4161,10 +4359,8 @@ mod tests {
|
||||
//
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_bulk_insert")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -4205,10 +4401,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_random_updates")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4220,7 +4414,7 @@ mod tests {
|
||||
// a read sees the latest page version.
|
||||
let mut updated = [Lsn(0); NUM_KEYS];
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
let mut lsn = Lsn(0);
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
@@ -4278,10 +4472,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_traverse_branches() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_traverse_branches")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4293,7 +4488,7 @@ mod tests {
|
||||
// a read sees the latest page version.
|
||||
let mut updated = [Lsn(0); NUM_KEYS];
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
let mut lsn = Lsn(0);
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
@@ -4360,10 +4555,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_traverse_ancestors() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_traverse_ancestors")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 100;
|
||||
const NUM_TLINES: usize = 50;
|
||||
@@ -4372,7 +4568,7 @@ mod tests {
|
||||
// Track page mutation lsns across different timelines.
|
||||
let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
let mut lsn = Lsn(0);
|
||||
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for idx in 0..NUM_TLINES {
|
||||
@@ -4418,28 +4614,6 @@ mod tests {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_test_timeline_is_usable() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_empty_test_timeline_is_usable")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x20), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
// Make sure the timeline has the minimum set of required keys for operation.
|
||||
// The only operation you can do on an empty timeline is to write new data.
|
||||
// Repartition is the only code on the write path that requires other keys to be present.
|
||||
// Make sure it works.
|
||||
{
|
||||
let cache = tline.partitioning.lock().unwrap();
|
||||
assert_eq!(cache.1, Lsn(0), "must not have repartitioned yet, otherwise the repartition call below might just use the cache");
|
||||
}
|
||||
tline
|
||||
.repartition(Lsn(0x20), tline.get_compaction_target_size(), &ctx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
|
||||
@@ -45,7 +45,7 @@ pub mod defaults {
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TenantConf {
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
@@ -100,11 +100,14 @@ pub struct TenantConf {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
pub gc_feedback: bool,
|
||||
// Region for master S3 bucket
|
||||
pub master_region: Option<String>,
|
||||
pub master_broker_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
/// which parameters are set and which are not.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
@@ -180,6 +183,14 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub gc_feedback: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub master_region: Option<String>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub master_broker_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -248,6 +259,8 @@ impl TenantConfOpt {
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
|
||||
master_region: self.master_region.clone(),
|
||||
master_broker_endpoint: self.master_broker_endpoint.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -285,6 +298,8 @@ impl Default for TenantConf {
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
gc_feedback: false,
|
||||
master_region: None,
|
||||
master_broker_endpoint: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -380,6 +395,8 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
);
|
||||
}
|
||||
tenant_conf.gc_feedback = request_data.gc_feedback;
|
||||
tenant_conf.master_region = request_data.master_region.clone();
|
||||
tenant_conf.master_broker_endpoint = request_data.master_broker_endpoint.clone();
|
||||
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ pub struct EphemeralFile {
|
||||
file_id: u64,
|
||||
_tenant_id: TenantId,
|
||||
_timeline_id: TimelineId,
|
||||
file: Option<Arc<VirtualFile>>,
|
||||
file: Arc<VirtualFile>,
|
||||
|
||||
pub size: u64,
|
||||
}
|
||||
@@ -52,10 +52,7 @@ impl EphemeralFile {
|
||||
) -> Result<EphemeralFile, io::Error> {
|
||||
let mut l = EPHEMERAL_FILES.write().unwrap();
|
||||
let file_id = l.next_file_id;
|
||||
l.next_file_id = l
|
||||
.next_file_id
|
||||
.checked_add(1)
|
||||
.expect("next_file_id is u64, expecting it to not overflow");
|
||||
l.next_file_id += 1;
|
||||
|
||||
let filename = conf
|
||||
.timeline_path(&timeline_id, &tenant_id)
|
||||
@@ -63,30 +60,16 @@ impl EphemeralFile {
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
// The next_file_id doesn't overlfow, so technically, `create_new` is not needed.
|
||||
// But it's cheap, so why not.
|
||||
.create_new(true),
|
||||
OpenOptions::new().read(true).write(true).create(true),
|
||||
)?;
|
||||
let file_rc = Arc::new(file);
|
||||
l.files.insert(file_id, file_rc.clone());
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
debug!(
|
||||
"created ephemeral file {}\n{}",
|
||||
filename.display(),
|
||||
std::backtrace::Backtrace::force_capture()
|
||||
);
|
||||
#[cfg(not(debug_assertions))]
|
||||
debug!("created ephemeral file {}", filename.display());
|
||||
|
||||
Ok(EphemeralFile {
|
||||
file_id,
|
||||
_tenant_id: tenant_id,
|
||||
_timeline_id: timeline_id,
|
||||
file: Some(file_rc),
|
||||
file: file_rc,
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
@@ -96,8 +79,6 @@ impl EphemeralFile {
|
||||
while off < PAGE_SZ {
|
||||
let n = self
|
||||
.file
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
|
||||
|
||||
if n == 0 {
|
||||
@@ -280,43 +261,17 @@ impl Drop for EphemeralFile {
|
||||
cache.drop_buffers_for_ephemeral(self.file_id);
|
||||
|
||||
// remove entry from the hash map
|
||||
let virtual_file = EPHEMERAL_FILES
|
||||
.write()
|
||||
.unwrap()
|
||||
.files
|
||||
.remove(&self.file_id)
|
||||
.unwrap();
|
||||
|
||||
// remove file from self
|
||||
let self_file = self.file.take().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
Arc::as_ptr(&virtual_file) as *const (),
|
||||
Arc::as_ptr(&self_file) as *const ()
|
||||
);
|
||||
drop(self_file);
|
||||
|
||||
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
|
||||
// It does the following checks atomically.
|
||||
assert_eq!(Arc::weak_count(&virtual_file), 0);
|
||||
let virtual_file = Arc::try_unwrap(virtual_file).expect(
|
||||
"we are being dropped and EPHEMERAL_FILES is the only other place where we put the Arc",
|
||||
);
|
||||
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
|
||||
|
||||
// unlink the file
|
||||
// TODO: we should be able to unwrap here, but, timeline delete and tenant detach do
|
||||
// std::fs::remove_dir_all without dropping all InMemoryLayer => EphemeralFile
|
||||
// of the tenant => need to fix that first.
|
||||
match virtual_file.remove() {
|
||||
Ok(()) => (),
|
||||
Err((virtual_file, e)) => {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
virtual_file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
let res = std::fs::remove_file(&self.file.path);
|
||||
if let Err(e) = res {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,325 +0,0 @@
|
||||
//! This module contains the encoding and decoding of the local manifest file.
|
||||
//!
|
||||
//! MANIFEST is a write-ahead log which is stored locally to each timeline. It
|
||||
//! records the state of the storage engine. It contains a snapshot of the
|
||||
//! state and all operations proceeding that snapshot. The file begins with a
|
||||
//! header recording MANIFEST version number. After that, it contains a snapshot.
|
||||
//! The snapshot is followed by a list of operations. Each operation is a list
|
||||
//! of records. Each record is either an addition or a removal of a layer.
|
||||
//!
|
||||
//! With MANIFEST, we can:
|
||||
//!
|
||||
//! 1. recover state quickly by reading the file, potentially boosting the
|
||||
//! startup speed.
|
||||
//! 2. ensure all operations are atomic and avoid corruption, solving issues
|
||||
//! like redundant image layer and preparing us for future compaction
|
||||
//! strategies.
|
||||
//!
|
||||
//! There is also a format for storing all layer files on S3, called
|
||||
//! `index_part.json`. Compared with index_part, MANIFEST is an WAL which
|
||||
//! records all operations as logs, and therefore we can easily replay the
|
||||
//! operations when recovering from crash, while ensuring those operations
|
||||
//! are atomic upon restart.
|
||||
//!
|
||||
//! Currently, this is not used in the system. Future refactors will ensure
|
||||
//! the storage state will be recorded in this file, and the system can be
|
||||
//! recovered from this file. This is tracked in
|
||||
//! https://github.com/neondatabase/neon/issues/4418
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use anyhow::Result;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crc32c::crc32c;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::log::warn;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::storage_layer::PersistentLayerDesc;
|
||||
|
||||
pub struct Manifest {
|
||||
file: VirtualFile,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub struct Snapshot {
|
||||
pub layers: Vec<PersistentLayerDesc>,
|
||||
}
|
||||
|
||||
/// serde by default encode this in tagged enum, and therefore it will be something
|
||||
/// like `{ "AddLayer": { ... } }`.
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub enum Record {
|
||||
AddLayer(PersistentLayerDesc),
|
||||
RemoveLayer(PersistentLayerDesc),
|
||||
}
|
||||
|
||||
/// `echo neon.manifest | sha1sum` and take the leading 8 bytes.
|
||||
const MANIFEST_MAGIC_NUMBER: u64 = 0xf5c44592b806109c;
|
||||
const MANIFEST_VERSION: u64 = 1;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub struct ManifestHeader {
|
||||
magic_number: u64,
|
||||
version: u64,
|
||||
}
|
||||
|
||||
const MANIFEST_HEADER_LEN: usize = 16;
|
||||
|
||||
impl ManifestHeader {
|
||||
fn encode(&self) -> BytesMut {
|
||||
let mut buf = BytesMut::with_capacity(MANIFEST_HEADER_LEN);
|
||||
buf.put_u64(self.magic_number);
|
||||
buf.put_u64(self.version);
|
||||
buf
|
||||
}
|
||||
|
||||
fn decode(mut buf: &[u8]) -> Self {
|
||||
assert!(buf.len() == MANIFEST_HEADER_LEN, "invalid header");
|
||||
Self {
|
||||
magic_number: buf.get_u64(),
|
||||
version: buf.get_u64(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub enum Operation {
|
||||
/// A snapshot of the current state.
|
||||
///
|
||||
/// Lsn field represents the LSN that is persisted to disk for this snapshot.
|
||||
Snapshot(Snapshot, Lsn),
|
||||
/// An atomic operation that changes the state.
|
||||
///
|
||||
/// Lsn field represents the LSN that is persisted to disk after the operation is done.
|
||||
/// This will only change when new L0 is flushed to the disk.
|
||||
Operation(Vec<Record>, Lsn),
|
||||
}
|
||||
|
||||
struct RecordHeader {
|
||||
size: u32,
|
||||
checksum: u32,
|
||||
}
|
||||
|
||||
const RECORD_HEADER_LEN: usize = 8;
|
||||
|
||||
impl RecordHeader {
|
||||
fn encode(&self) -> BytesMut {
|
||||
let mut buf = BytesMut::with_capacity(RECORD_HEADER_LEN);
|
||||
buf.put_u32(self.size);
|
||||
buf.put_u32(self.checksum);
|
||||
buf
|
||||
}
|
||||
|
||||
fn decode(mut buf: &[u8]) -> Self {
|
||||
assert!(buf.len() == RECORD_HEADER_LEN, "invalid header");
|
||||
Self {
|
||||
size: buf.get_u32(),
|
||||
checksum: buf.get_u32(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ManifestLoadError {
|
||||
#[error("manifest header is corrupted")]
|
||||
CorruptedManifestHeader,
|
||||
#[error("unsupported manifest version: got {0}, expected {1}")]
|
||||
UnsupportedVersion(u64, u64),
|
||||
#[error("error when decoding record: {0}")]
|
||||
DecodeRecord(serde_json::Error),
|
||||
#[error("I/O error: {0}")]
|
||||
Io(io::Error),
|
||||
}
|
||||
|
||||
#[must_use = "Should check if the manifest is partially corrupted"]
|
||||
pub struct ManifestPartiallyCorrupted(bool);
|
||||
|
||||
impl Manifest {
|
||||
/// Create a new manifest by writing the manifest header and a snapshot record to the given file.
|
||||
pub fn init(file: VirtualFile, snapshot: Snapshot, lsn: Lsn) -> Result<Self> {
|
||||
let mut manifest = Self { file };
|
||||
manifest.append_manifest_header(ManifestHeader {
|
||||
magic_number: MANIFEST_MAGIC_NUMBER,
|
||||
version: MANIFEST_VERSION,
|
||||
})?;
|
||||
manifest.append_operation(Operation::Snapshot(snapshot, lsn))?;
|
||||
Ok(manifest)
|
||||
}
|
||||
|
||||
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
|
||||
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
|
||||
/// backup the current one.
|
||||
pub fn load(
|
||||
mut file: VirtualFile,
|
||||
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
|
||||
let mut buf = vec![];
|
||||
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
|
||||
|
||||
// Read manifest header
|
||||
let mut buf = Bytes::from(buf);
|
||||
if buf.remaining() < MANIFEST_HEADER_LEN {
|
||||
return Err(ManifestLoadError::CorruptedManifestHeader);
|
||||
}
|
||||
let header = ManifestHeader::decode(&buf[..MANIFEST_HEADER_LEN]);
|
||||
buf.advance(MANIFEST_HEADER_LEN);
|
||||
if header.version != MANIFEST_VERSION {
|
||||
return Err(ManifestLoadError::UnsupportedVersion(
|
||||
header.version,
|
||||
MANIFEST_VERSION,
|
||||
));
|
||||
}
|
||||
|
||||
// Read operations
|
||||
let mut operations = Vec::new();
|
||||
let corrupted = loop {
|
||||
if buf.remaining() == 0 {
|
||||
break false;
|
||||
}
|
||||
if buf.remaining() < RECORD_HEADER_LEN {
|
||||
warn!("incomplete header when decoding manifest, could be corrupted");
|
||||
break true;
|
||||
}
|
||||
let RecordHeader { size, checksum } = RecordHeader::decode(&buf[..RECORD_HEADER_LEN]);
|
||||
let size = size as usize;
|
||||
buf.advance(RECORD_HEADER_LEN);
|
||||
if buf.remaining() < size {
|
||||
warn!("incomplete data when decoding manifest, could be corrupted");
|
||||
break true;
|
||||
}
|
||||
let data = &buf[..size];
|
||||
if crc32c(data) != checksum {
|
||||
warn!("checksum mismatch when decoding manifest, could be corrupted");
|
||||
break true;
|
||||
}
|
||||
// if the following decode fails, we cannot use the manifest or safely ignore any record.
|
||||
operations.push(serde_json::from_slice(data).map_err(ManifestLoadError::DecodeRecord)?);
|
||||
buf.advance(size);
|
||||
};
|
||||
Ok((
|
||||
Self { file },
|
||||
operations,
|
||||
ManifestPartiallyCorrupted(corrupted),
|
||||
))
|
||||
}
|
||||
|
||||
fn append_data(&mut self, data: &[u8]) -> Result<()> {
|
||||
if data.len() >= u32::MAX as usize {
|
||||
panic!("data too large");
|
||||
}
|
||||
let header = RecordHeader {
|
||||
size: data.len() as u32,
|
||||
checksum: crc32c(data),
|
||||
};
|
||||
let header = header.encode();
|
||||
self.file.write_all(&header)?;
|
||||
self.file.write_all(data)?;
|
||||
self.file.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn append_manifest_header(&mut self, header: ManifestHeader) -> Result<()> {
|
||||
let encoded = header.encode();
|
||||
self.file.write_all(&encoded)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add an operation to the manifest. The operation will be appended to the end of the file,
|
||||
/// and the file will fsync.
|
||||
pub fn append_operation(&mut self, operation: Operation) -> Result<()> {
|
||||
let encoded = Vec::from(serde_json::to_string(&operation)?);
|
||||
self.append_data(&encoded)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
use crate::repository::Key;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_read_manifest() {
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
|
||||
let layer1 = PersistentLayerDesc::new_test(Key::from_i128(0)..Key::from_i128(233));
|
||||
let layer2 = PersistentLayerDesc::new_test(Key::from_i128(233)..Key::from_i128(2333));
|
||||
let layer3 = PersistentLayerDesc::new_test(Key::from_i128(2333)..Key::from_i128(23333));
|
||||
let layer4 = PersistentLayerDesc::new_test(Key::from_i128(23333)..Key::from_i128(233333));
|
||||
|
||||
// Write a manifest with a snapshot and some operations
|
||||
let snapshot = Snapshot {
|
||||
layers: vec![layer1, layer2],
|
||||
};
|
||||
let mut manifest = Manifest::init(file, snapshot.clone(), Lsn::from(0)).unwrap();
|
||||
manifest
|
||||
.append_operation(Operation::Operation(
|
||||
vec![Record::AddLayer(layer3.clone())],
|
||||
Lsn::from(1),
|
||||
))
|
||||
.unwrap();
|
||||
drop(manifest);
|
||||
|
||||
// Open the second time and write
|
||||
let file = VirtualFile::open_with_options(
|
||||
&testdir.join("MANIFEST"),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create_new(false)
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 2);
|
||||
assert_eq!(
|
||||
&operations[0],
|
||||
&Operation::Snapshot(snapshot.clone(), Lsn::from(0))
|
||||
);
|
||||
assert_eq!(
|
||||
&operations[1],
|
||||
&Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1))
|
||||
);
|
||||
manifest
|
||||
.append_operation(Operation::Operation(
|
||||
vec![
|
||||
Record::RemoveLayer(layer3.clone()),
|
||||
Record::AddLayer(layer4.clone()),
|
||||
],
|
||||
Lsn::from(2),
|
||||
))
|
||||
.unwrap();
|
||||
drop(manifest);
|
||||
|
||||
// Open the third time and verify
|
||||
let file = VirtualFile::open_with_options(
|
||||
&testdir.join("MANIFEST"),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create_new(false)
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 3);
|
||||
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));
|
||||
assert_eq!(
|
||||
&operations[1],
|
||||
&Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1))
|
||||
);
|
||||
assert_eq!(
|
||||
&operations[2],
|
||||
&Operation::Operation(
|
||||
vec![Record::RemoveLayer(layer3), Record::AddLayer(layer4)],
|
||||
Lsn::from(2)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -23,10 +23,7 @@ use crate::config::PageServerConf;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
/// Use special format number to enable backward compatibility.
|
||||
const METADATA_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Previous supported format versions.
|
||||
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
|
||||
const METADATA_FORMAT_VERSION: u16 = 5;
|
||||
|
||||
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
|
||||
///
|
||||
@@ -40,7 +37,7 @@ const METADATA_MAX_SIZE: usize = 512;
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct TimelineMetadata {
|
||||
hdr: TimelineMetadataHeader,
|
||||
body: TimelineMetadataBodyV2,
|
||||
body: TimelineMetadataBodyV3,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -72,6 +69,28 @@ struct TimelineMetadataBodyV2 {
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: Lsn,
|
||||
// This is only set if we know it. We track it in memory when the page
|
||||
// server is running, but we only track the value corresponding to
|
||||
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
|
||||
// lot. We only store it in the metadata file when we flush *all* the
|
||||
// in-memory data so that 'last_record_lsn' is the same as
|
||||
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
|
||||
// soon as we reprocess at least one record, we will have a valid
|
||||
// 'prev_record_lsn' value in memory again. This is only really needed when
|
||||
// doing a clean shutdown, so that there is no more WAL beyond
|
||||
// 'disk_consistent_lsn'
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<TimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
replica_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TimelineMetadataBodyV1 {
|
||||
disk_consistent_lsn: Lsn,
|
||||
@@ -101,6 +120,7 @@ impl TimelineMetadata {
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
replica_lsn: Option<Lsn>,
|
||||
) -> Self {
|
||||
Self {
|
||||
hdr: TimelineMetadataHeader {
|
||||
@@ -108,7 +128,7 @@ impl TimelineMetadata {
|
||||
size: 0,
|
||||
format_version: METADATA_FORMAT_VERSION,
|
||||
},
|
||||
body: TimelineMetadataBodyV2 {
|
||||
body: TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
ancestor_timeline,
|
||||
@@ -116,35 +136,48 @@ impl TimelineMetadata {
|
||||
latest_gc_cutoff_lsn,
|
||||
initdb_lsn,
|
||||
pg_version,
|
||||
replica_lsn,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn upgrade_timeline_metadata(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
|
||||
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
|
||||
|
||||
// backward compatible only up to this version
|
||||
ensure!(
|
||||
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
|
||||
"unsupported metadata format version {}",
|
||||
hdr.format_version
|
||||
);
|
||||
|
||||
let metadata_size = hdr.size as usize;
|
||||
|
||||
let body: TimelineMetadataBodyV1 =
|
||||
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
let body = match hdr.format_version {
|
||||
3 => {
|
||||
let body: TimelineMetadataBodyV1 =
|
||||
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
|
||||
let body = TimelineMetadataBodyV2 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: 14, // All timelines created before this version had pg_version 14
|
||||
TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: 14, // All timelines created before this version had pg_version 14
|
||||
replica_lsn: None,
|
||||
}
|
||||
}
|
||||
4 => {
|
||||
let body: TimelineMetadataBodyV2 =
|
||||
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
|
||||
TimelineMetadataBodyV3 {
|
||||
disk_consistent_lsn: body.disk_consistent_lsn,
|
||||
prev_record_lsn: body.prev_record_lsn,
|
||||
ancestor_timeline: body.ancestor_timeline,
|
||||
ancestor_lsn: body.ancestor_lsn,
|
||||
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: body.initdb_lsn,
|
||||
pg_version: body.pg_version, // All timelines created before this version had pg_version 14
|
||||
replica_lsn: None,
|
||||
}
|
||||
}
|
||||
_ => bail!("unsupported metadata format version {}", hdr.format_version),
|
||||
};
|
||||
|
||||
hdr.format_version = METADATA_FORMAT_VERSION;
|
||||
|
||||
Ok(Self { hdr, body })
|
||||
@@ -174,7 +207,7 @@ impl TimelineMetadata {
|
||||
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
|
||||
} else {
|
||||
let body =
|
||||
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
TimelineMetadataBodyV3::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
|
||||
ensure!(
|
||||
body.disk_consistent_lsn.is_aligned(),
|
||||
"disk_consistent_lsn is not aligned"
|
||||
@@ -227,6 +260,10 @@ impl TimelineMetadata {
|
||||
pub fn pg_version(&self) -> u32 {
|
||||
self.body.pg_version
|
||||
}
|
||||
|
||||
pub fn replica_lsn(&self) -> Option<Lsn> {
|
||||
self.body.replica_lsn
|
||||
}
|
||||
}
|
||||
|
||||
/// Save timeline metadata to file
|
||||
@@ -330,7 +367,7 @@ mod tests {
|
||||
hdr: TimelineMetadataHeader {
|
||||
checksum: 0,
|
||||
size: 0,
|
||||
format_version: METADATA_OLD_FORMAT_VERSION,
|
||||
format_version: 3,
|
||||
},
|
||||
body: TimelineMetadataBodyV1 {
|
||||
disk_consistent_lsn: Lsn(0x200),
|
||||
|
||||
@@ -348,7 +348,7 @@ pub async fn set_new_tenant_config(
|
||||
Tenant::persist_tenant_config(
|
||||
&tenant.tenant_id(),
|
||||
&tenant_config_path,
|
||||
new_tenant_conf,
|
||||
new_tenant_conf.clone(),
|
||||
false,
|
||||
)
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
@@ -1392,7 +1392,7 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -12,7 +12,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use super::index::LayerFileMetadata;
|
||||
|
||||
/// Serializes and uploads the given index part data to the remote storage.
|
||||
pub(super) async fn upload_index_part<'a>(
|
||||
pub async fn upload_index_part<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -917,7 +917,7 @@ impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
match inner.blob_writer.into_inner().into_inner() {
|
||||
Ok(vfile) => vfile.remove().unwrap(),
|
||||
Ok(vfile) => vfile.remove(),
|
||||
Err(err) => warn!(
|
||||
"error while flushing buffer of image layer temporary file: {}",
|
||||
err
|
||||
|
||||
@@ -215,6 +215,18 @@ impl LayerFileName {
|
||||
Self::Delta(fname) => fname.to_string(),
|
||||
}
|
||||
}
|
||||
pub fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
match self {
|
||||
Self::Image(fname) => fname.lsn..fname.lsn + 1,
|
||||
Self::Delta(fname) => fname.lsn_range.clone(),
|
||||
}
|
||||
}
|
||||
pub fn get_key_range(&self) -> Range<Key> {
|
||||
match self {
|
||||
Self::Image(fname) => fname.key_range.clone(),
|
||||
Self::Delta(fname) => fname.key_range.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerFileName {
|
||||
|
||||
@@ -709,7 +709,7 @@ impl ImageLayerWriter {
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove().unwrap();
|
||||
inner.blob_writer.into_inner().remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,12 +9,10 @@ use crate::{context::RequestContext, repository::Key};
|
||||
|
||||
use super::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
|
||||
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
|
||||
/// a unified way to generate layer information like file name.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct PersistentLayerDesc {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
@@ -52,19 +50,6 @@ impl PersistentLayerDesc {
|
||||
self.filename().file_name()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_test(key_range: Range<Key>) -> Self {
|
||||
Self {
|
||||
tenant_id: TenantId::generate(),
|
||||
timeline_id: TimelineId::generate(),
|
||||
key_range,
|
||||
lsn_range: Lsn(0)..Lsn(1),
|
||||
is_delta: false,
|
||||
is_incremental: false,
|
||||
file_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_img(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
|
||||
@@ -144,6 +144,16 @@ pub struct Timeline {
|
||||
/// See [`storage_sync`] module comment for details.
|
||||
pub remote_client: Option<Arc<RemoteTimelineClient>>,
|
||||
|
||||
/// Master remote storage client (for cross-region pageserver replica).
|
||||
/// All layers created by replica are stored in local region S3 bucket, but
|
||||
/// pageserver may need to download older layers from master S3 bucket.
|
||||
pub master_client: Option<Arc<RemoteTimelineClient>>,
|
||||
|
||||
/// Remote consistent LSN at which cross-region replica was created.
|
||||
/// All layers which start ls smaller than this point should be downloaded from master S3 bucket
|
||||
/// (see master_client).
|
||||
pub replica_lsn: Option<Lsn>,
|
||||
|
||||
// What page versions do we hold in the repository? If we get a
|
||||
// request > last_record_lsn, we need to wait until we receive all
|
||||
// the WAL up to the request. The SeqWait provides functions for
|
||||
@@ -216,7 +226,7 @@ pub struct Timeline {
|
||||
pub initdb_lsn: Lsn,
|
||||
|
||||
/// When did we last calculate the partitioning?
|
||||
pub(super) partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
|
||||
/// Configuration: how often should the partitioning be recalculated.
|
||||
repartition_threshold: u64,
|
||||
@@ -684,11 +694,8 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
debug!("start");
|
||||
self.freeze_inmem_layer(false);
|
||||
let ret = self.flush_frozen_layers_and_wait().await;
|
||||
debug!(is_err = ret.is_err(), "complete");
|
||||
ret
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
@@ -1059,7 +1066,10 @@ impl Timeline {
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
if self.remote_client.is_none() {
|
||||
if self
|
||||
.get_download_source(remote_layer.get_lsn_range().start)
|
||||
.is_none()
|
||||
{
|
||||
return Ok(Some(false));
|
||||
}
|
||||
|
||||
@@ -1309,7 +1319,7 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub(super) fn get_compaction_target_size(&self) -> u64 {
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.compaction_target_size
|
||||
@@ -1373,6 +1383,17 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_download_source(&self, layer_start_lsn: Lsn) -> Option<&Arc<RemoteTimelineClient>> {
|
||||
self.replica_lsn
|
||||
.map_or(self.remote_client.as_ref(), |replica_lsn| {
|
||||
if layer_start_lsn < replica_lsn {
|
||||
self.master_client.as_ref()
|
||||
} else {
|
||||
self.remote_client.as_ref()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Open a Timeline handle.
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
@@ -1386,6 +1407,8 @@ impl Timeline {
|
||||
tenant_id: TenantId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
master_client: Option<RemoteTimelineClient>,
|
||||
replica_lsn: Option<Lsn>,
|
||||
pg_version: u32,
|
||||
initial_logical_size_can_start: Option<completion::Barrier>,
|
||||
initial_logical_size_attempt: Option<completion::Completion>,
|
||||
@@ -1421,6 +1444,9 @@ impl Timeline {
|
||||
|
||||
remote_client: remote_client.map(Arc::new),
|
||||
|
||||
master_client: master_client.map(Arc::new),
|
||||
replica_lsn,
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
last: disk_consistent_lsn,
|
||||
@@ -1708,7 +1734,7 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_remote_layers(
|
||||
pub async fn create_remote_layers(
|
||||
&self,
|
||||
index_part: &IndexPart,
|
||||
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
|
||||
@@ -2389,11 +2415,8 @@ impl Timeline {
|
||||
ValueReconstructResult::Missing => {
|
||||
return Err(layer_traversal_error(
|
||||
format!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}\n{}",
|
||||
key,
|
||||
cont_lsn,
|
||||
request_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}",
|
||||
key, cont_lsn, request_lsn
|
||||
),
|
||||
traversal_path,
|
||||
));
|
||||
@@ -2865,21 +2888,14 @@ impl Timeline {
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let l = layers.frozen_layers.pop_front().unwrap();
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
// timeline). If two threads tried to flush the same frozen
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert!(LayerMap::compare_arced_layers(&l, &frozen_layer));
|
||||
drop(frozen_layer);
|
||||
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
|
||||
// It does the following checks atomically.
|
||||
assert_eq!(Arc::weak_count(&l), 0);
|
||||
let l =
|
||||
Arc::try_unwrap(l).expect("no-one except us holds references to this layer");
|
||||
drop(layers); // don't hold layer map lock when doing disk IO
|
||||
info!("dropping frozen layer, this should remove the ephemeral file on disk");
|
||||
drop(l);
|
||||
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer));
|
||||
|
||||
// release lock on 'layers'
|
||||
}
|
||||
|
||||
fail_point!("checkpoint-after-sync");
|
||||
@@ -2938,6 +2954,7 @@ impl Timeline {
|
||||
*self.latest_gc_cutoff_lsn.read(),
|
||||
self.initdb_lsn,
|
||||
self.pg_version,
|
||||
self.replica_lsn,
|
||||
);
|
||||
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
@@ -3013,7 +3030,7 @@ impl Timeline {
|
||||
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
pub(super) async fn repartition(
|
||||
async fn repartition(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
partition_size: u64,
|
||||
@@ -3100,7 +3117,12 @@ impl Timeline {
|
||||
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
|
||||
|
||||
max_deltas = max_deltas.max(num_deltas);
|
||||
if num_deltas >= threshold {
|
||||
// Create new image layers if there are at least `threshold` delta layers since last image layer...
|
||||
if num_deltas >= threshold
|
||||
// ...or it is master layer for cross-region replica: force generation of image layer in this case
|
||||
// to make replica independent from master.
|
||||
|| img_lsn <= self.replica_lsn.unwrap_or(Lsn(0))
|
||||
{
|
||||
debug!(
|
||||
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
|
||||
img_range.start, img_range.end, num_deltas, img_lsn, lsn
|
||||
@@ -3297,6 +3319,13 @@ impl Timeline {
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
|
||||
// Do not compact L0 delta from master for cross-regio replica
|
||||
// because master and replica layers are distinguished by LSN
|
||||
// and L0 and L1 layers have the same LSN range
|
||||
if let Some(replica_lsn) = &self.replica_lsn {
|
||||
level0_deltas.retain(|l| l.get_lsn_range().start >= *replica_lsn);
|
||||
}
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
let threshold = self.get_compaction_threshold();
|
||||
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
|
||||
@@ -4204,7 +4233,7 @@ impl Timeline {
|
||||
&format!("download layer {}", remote_layer.short_id()),
|
||||
false,
|
||||
async move {
|
||||
let remote_client = self_clone.remote_client.as_ref().unwrap();
|
||||
let remote_client = self_clone.get_download_source(remote_layer.get_lsn_range().start).unwrap();
|
||||
|
||||
// Does retries + exponential back-off internally.
|
||||
// When this fails, don't layer further retry attempts here.
|
||||
|
||||
@@ -1324,7 +1324,7 @@ mod tests {
|
||||
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -324,8 +324,16 @@ impl VirtualFile {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Idempotently close the file descriptor we might have or have not open for this VirtualFile.
|
||||
pub fn close(&mut self) {
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
@@ -343,26 +351,6 @@ impl VirtualFile {
|
||||
.observe_closure_duration(|| slot_guard.file.take());
|
||||
}
|
||||
}
|
||||
|
||||
/// Caller can retry if we return an `Err`.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn remove(mut self) -> Result<(), (Self, std::io::Error)> {
|
||||
// close our fd before unlink system call, so that the unlink actually performs the removal
|
||||
self.close();
|
||||
// Try to remove file on disk.
|
||||
// If it fails, we idempotently closed the fd, but the caller can choose to retry.
|
||||
match std::fs::remove_file(&self.path) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => Err((self, e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
self.close();
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for VirtualFile {
|
||||
|
||||
@@ -1208,8 +1208,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_relsize")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1428,8 +1427,7 @@ mod tests {
|
||||
// and then created it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_drop_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1498,8 +1496,7 @@ mod tests {
|
||||
// and then extended it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_truncate_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1639,8 +1636,7 @@ mod tests {
|
||||
/// split into multiple 1 GB segments in Postgres.
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_large_rel")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
|
||||
@@ -1,63 +1,10 @@
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
# Just start and measure duration.
|
||||
#
|
||||
# This test runs pretty quickly and can be informative when used in combination
|
||||
# with emulated network delay. Some useful delay commands:
|
||||
#
|
||||
# 1. Add 2msec delay to all localhost traffic
|
||||
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
|
||||
#
|
||||
# 2. Test that it works (you should see 4ms ping)
|
||||
# `ping localhost`
|
||||
#
|
||||
# 3. Revert back to normal
|
||||
# `sudo tc qdisc del dev lo root netem`
|
||||
#
|
||||
# NOTE this test might not represent the real startup time because the basebackup
|
||||
# for a large database might be larger if there's a lof of transaction metadata,
|
||||
# or safekeepers might need more syncing, or there might be more operations to
|
||||
# apply during config step, like more users, databases, or extensions. By default
|
||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||
# test we only load neon.
|
||||
def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{i}_start_and_select"):
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
|
||||
"basebackup_ms": f"{i}_basebackup",
|
||||
"config_ms": f"{i}_config",
|
||||
"total_startup_ms": f"{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
# This test sometimes runs for longer than the global 5 minute timeout.
|
||||
@pytest.mark.timeout(600)
|
||||
def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
|
||||
Reference in New Issue
Block a user