mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 12:52:55 +00:00
Compare commits
1 Commits
conrad/pro
...
remove_ini
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd235a5fe3 |
@@ -4653,74 +4653,6 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
|
|
||||||
let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
|
|
||||||
.load()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let initdb_lsn = Lsn(0x20);
|
|
||||||
let utline = tenant
|
|
||||||
.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
|
|
||||||
.await?;
|
|
||||||
let tline = utline.raw_timeline().unwrap();
|
|
||||||
|
|
||||||
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
|
|
||||||
tline.maybe_spawn_flush_loop();
|
|
||||||
|
|
||||||
// Make sure the timeline has the minimum set of required keys for operation.
|
|
||||||
// The only operation you can always do on an empty timeline is to `put` new data.
|
|
||||||
// Except if you `put` at `initdb_lsn`.
|
|
||||||
// In that case, there's an optimization to directly create image layers instead of delta layers.
|
|
||||||
// It uses `repartition()`, which assumes some keys to be present.
|
|
||||||
// Let's make sure the test timeline can handle that case.
|
|
||||||
{
|
|
||||||
let mut state = tline.flush_loop_state.lock().unwrap();
|
|
||||||
assert_eq!(
|
|
||||||
timeline::FlushLoopState::Running {
|
|
||||||
expect_initdb_optimization: false,
|
|
||||||
initdb_optimization_count: 0,
|
|
||||||
},
|
|
||||||
*state
|
|
||||||
);
|
|
||||||
*state = timeline::FlushLoopState::Running {
|
|
||||||
expect_initdb_optimization: true,
|
|
||||||
initdb_optimization_count: 0,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
|
|
||||||
// As explained above, the optimization requires some keys to be present.
|
|
||||||
// As per `create_empty_timeline` documentation, use init_empty to set them.
|
|
||||||
// This is what `create_test_timeline` does, by the way.
|
|
||||||
let mut modification = tline.begin_modification(initdb_lsn);
|
|
||||||
modification
|
|
||||||
.init_empty_test_timeline()
|
|
||||||
.context("init_empty_test_timeline")?;
|
|
||||||
modification
|
|
||||||
.commit(&ctx)
|
|
||||||
.await
|
|
||||||
.context("commit init_empty_test_timeline modification")?;
|
|
||||||
|
|
||||||
// Do the flush. The flush code will check the expectations that we set above.
|
|
||||||
tline.freeze_and_flush().await?;
|
|
||||||
|
|
||||||
// assert freeze_and_flush exercised the initdb optimization
|
|
||||||
{
|
|
||||||
let state = tline.flush_loop_state.lock().unwrap();
|
|
||||||
let timeline::FlushLoopState::Running {
|
|
||||||
expect_initdb_optimization,
|
|
||||||
initdb_optimization_count,
|
|
||||||
} = *state
|
|
||||||
else {
|
|
||||||
panic!("unexpected state: {:?}", *state);
|
|
||||||
};
|
|
||||||
assert!(expect_initdb_optimization);
|
|
||||||
assert!(initdb_optimization_count > 0);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_uninit_mark_crash() -> anyhow::Result<()> {
|
async fn test_uninit_mark_crash() -> anyhow::Result<()> {
|
||||||
let name = "test_uninit_mark_crash";
|
let name = "test_uninit_mark_crash";
|
||||||
|
|||||||
@@ -95,12 +95,7 @@ use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenant
|
|||||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||||
pub(super) enum FlushLoopState {
|
pub(super) enum FlushLoopState {
|
||||||
NotStarted,
|
NotStarted,
|
||||||
Running {
|
Running,
|
||||||
#[cfg(test)]
|
|
||||||
expect_initdb_optimization: bool,
|
|
||||||
#[cfg(test)]
|
|
||||||
initdb_optimization_count: usize,
|
|
||||||
},
|
|
||||||
Exited,
|
Exited,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1466,7 +1461,7 @@ impl Timeline {
|
|||||||
let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
|
let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
|
||||||
match *flush_loop_state {
|
match *flush_loop_state {
|
||||||
FlushLoopState::NotStarted => (),
|
FlushLoopState::NotStarted => (),
|
||||||
FlushLoopState::Running { .. } => {
|
FlushLoopState::Running => {
|
||||||
info!(
|
info!(
|
||||||
"skipping attempt to start flush_loop twice {}/{}",
|
"skipping attempt to start flush_loop twice {}/{}",
|
||||||
self.tenant_id, self.timeline_id
|
self.tenant_id, self.timeline_id
|
||||||
@@ -1486,12 +1481,7 @@ impl Timeline {
|
|||||||
let self_clone = Arc::clone(self);
|
let self_clone = Arc::clone(self);
|
||||||
|
|
||||||
debug!("spawning flush loop");
|
debug!("spawning flush loop");
|
||||||
*flush_loop_state = FlushLoopState::Running {
|
*flush_loop_state = FlushLoopState::Running;
|
||||||
#[cfg(test)]
|
|
||||||
expect_initdb_optimization: false,
|
|
||||||
#[cfg(test)]
|
|
||||||
initdb_optimization_count: 0,
|
|
||||||
};
|
|
||||||
task_mgr::spawn(
|
task_mgr::spawn(
|
||||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||||
task_mgr::TaskKind::LayerFlushTask,
|
task_mgr::TaskKind::LayerFlushTask,
|
||||||
@@ -1503,7 +1493,7 @@ impl Timeline {
|
|||||||
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
|
||||||
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
|
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
|
||||||
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
|
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
|
||||||
assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
|
assert!(matches!(*flush_loop_state, FlushLoopState::Running));
|
||||||
*flush_loop_state = FlushLoopState::Exited;
|
*flush_loop_state = FlushLoopState::Exited;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -2450,7 +2440,7 @@ impl Timeline {
|
|||||||
let mut my_flush_request = 0;
|
let mut my_flush_request = 0;
|
||||||
|
|
||||||
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
|
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
|
||||||
if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
|
if !matches!(flush_loop_state, FlushLoopState::Running) {
|
||||||
anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
|
anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2503,65 +2493,8 @@ impl Timeline {
|
|||||||
frozen_layer: Arc<InMemoryLayer>,
|
frozen_layer: Arc<InMemoryLayer>,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), FlushLayerError> {
|
) -> Result<(), FlushLayerError> {
|
||||||
// As a special case, when we have just imported an image into the repository,
|
|
||||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
|
||||||
// files instead. This is possible as long as *all* the data imported into the
|
|
||||||
// repository have the same LSN.
|
|
||||||
let lsn_range = frozen_layer.get_lsn_range();
|
let lsn_range = frozen_layer.get_lsn_range();
|
||||||
let (layers_to_upload, delta_layer_to_add) =
|
let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
|
||||||
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
|
|
||||||
#[cfg(test)]
|
|
||||||
match &mut *self.flush_loop_state.lock().unwrap() {
|
|
||||||
FlushLoopState::NotStarted | FlushLoopState::Exited => {
|
|
||||||
panic!("flush loop not running")
|
|
||||||
}
|
|
||||||
FlushLoopState::Running {
|
|
||||||
initdb_optimization_count,
|
|
||||||
..
|
|
||||||
} => {
|
|
||||||
*initdb_optimization_count += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
|
||||||
// require downloading anything during initial import.
|
|
||||||
let (partitioning, _lsn) = self
|
|
||||||
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if self.cancel.is_cancelled() {
|
|
||||||
return Err(FlushLayerError::Cancelled);
|
|
||||||
}
|
|
||||||
|
|
||||||
// For image layers, we add them immediately into the layer map.
|
|
||||||
(
|
|
||||||
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
|
||||||
.await?,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
#[cfg(test)]
|
|
||||||
match &mut *self.flush_loop_state.lock().unwrap() {
|
|
||||||
FlushLoopState::NotStarted | FlushLoopState::Exited => {
|
|
||||||
panic!("flush loop not running")
|
|
||||||
}
|
|
||||||
FlushLoopState::Running {
|
|
||||||
expect_initdb_optimization,
|
|
||||||
..
|
|
||||||
} => {
|
|
||||||
assert!(!*expect_initdb_optimization, "expected initdb optimization");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Normal case, write out a L0 delta layer file.
|
|
||||||
// `create_delta_layer` will not modify the layer map.
|
|
||||||
// We will remove frozen layer and add delta layer in one atomic operation later.
|
|
||||||
let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
|
|
||||||
(
|
|
||||||
// FIXME: even though we have a single image and single delta layer assumption
|
|
||||||
// we push them to vec
|
|
||||||
vec![layer.clone()],
|
|
||||||
Some(layer),
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
if self.cancel.is_cancelled() {
|
if self.cancel.is_cancelled() {
|
||||||
return Err(FlushLayerError::Cancelled);
|
return Err(FlushLayerError::Cancelled);
|
||||||
@@ -2580,18 +2513,17 @@ impl Timeline {
|
|||||||
return Err(FlushLayerError::Cancelled);
|
return Err(FlushLayerError::Cancelled);
|
||||||
}
|
}
|
||||||
|
|
||||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
|
guard.finish_flush_l0_layer(&layer, &frozen_layer, &self.metrics);
|
||||||
|
|
||||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||||
|
|
||||||
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
||||||
Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
|
Some(self.schedule_uploads(disk_consistent_lsn, [layer])?)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
// release lock on 'layers'
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||||
|
|||||||
@@ -164,7 +164,7 @@ impl LayerManager {
|
|||||||
/// Flush a frozen layer and add the written delta layer to the layer map.
|
/// Flush a frozen layer and add the written delta layer to the layer map.
|
||||||
pub(crate) fn finish_flush_l0_layer(
|
pub(crate) fn finish_flush_l0_layer(
|
||||||
&mut self,
|
&mut self,
|
||||||
delta_layer: Option<&ResidentLayer>,
|
delta_layer: &ResidentLayer,
|
||||||
frozen_layer_for_check: &Arc<InMemoryLayer>,
|
frozen_layer_for_check: &Arc<InMemoryLayer>,
|
||||||
metrics: &TimelineMetrics,
|
metrics: &TimelineMetrics,
|
||||||
) {
|
) {
|
||||||
@@ -179,12 +179,14 @@ impl LayerManager {
|
|||||||
// layer to disk at the same time, that would not work.
|
// layer to disk at the same time, that would not work.
|
||||||
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
|
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
|
||||||
|
|
||||||
if let Some(l) = delta_layer {
|
let mut updates = self.layer_map.batch_update();
|
||||||
let mut updates = self.layer_map.batch_update();
|
Self::insert_historic_layer(
|
||||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
delta_layer.as_ref().clone(),
|
||||||
metrics.record_new_file_metrics(l.layer_desc().file_size);
|
&mut updates,
|
||||||
updates.flush();
|
&mut self.layer_fmgr,
|
||||||
}
|
);
|
||||||
|
metrics.record_new_file_metrics(delta_layer.layer_desc().file_size);
|
||||||
|
updates.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Called when compaction is completed.
|
/// Called when compaction is completed.
|
||||||
|
|||||||
Reference in New Issue
Block a user