mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
feat(pageserver): generate image layers for sparse keyspace (#7567)
Part of https://github.com/neondatabase/neon/issues/7462 Sparse keyspace does not generate image layers for now. This pull request adds support for generating image layers for sparse keyspace. ## Summary of changes * Use the scan interface to generate compaction data for sparse keyspace. * Track num of delta layers reads during scan. * Read-trigger compaction: when a scan on the keyspace touches too many delta files, generate an image layer. There are one hard-coded threshold for now: max delta layers we want to touch for a scan. * L0 compaction does not need to compute holes for metadata keyspace. Know issue: the scan interface currently reads past the image layer, which causes `delta_layer_accessed` keeps increasing even if image layers are generated. The pull request to fix that will be separate, and orthogonal to this one. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -40,7 +40,11 @@ use utils::bin_ser::DeserializeError;
|
||||
use utils::vec_map::{VecMap, VecMapOrdering};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
|
||||
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
|
||||
/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
|
||||
pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum LsnForTimestamp {
|
||||
|
||||
@@ -4777,7 +4777,12 @@ mod tests {
|
||||
info!("Doing vectored read on {:?}", read);
|
||||
|
||||
let vectored_res = tline
|
||||
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
|
||||
.get_vectored_impl(
|
||||
read.clone(),
|
||||
reads_lsn,
|
||||
&mut ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
tline
|
||||
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
|
||||
@@ -4826,7 +4831,7 @@ mod tests {
|
||||
.get_vectored_impl(
|
||||
aux_keyspace.clone(),
|
||||
read_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&mut ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
@@ -4971,7 +4976,7 @@ mod tests {
|
||||
.get_vectored_impl(
|
||||
read.clone(),
|
||||
current_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&mut ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -5106,7 +5111,7 @@ mod tests {
|
||||
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
|
||||
},
|
||||
query_lsn,
|
||||
ValuesReconstructState::new(),
|
||||
&mut ValuesReconstructState::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
@@ -5547,7 +5552,7 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
const STEP: usize = 100; // random update + scan base_key + idx * STEP
|
||||
const STEP: usize = 10000; // random update + scan base_key + idx * STEP
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -5580,7 +5585,7 @@ mod tests {
|
||||
|
||||
let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32));
|
||||
|
||||
for _ in 0..10 {
|
||||
for iter in 0..=10 {
|
||||
// Read all the blocks
|
||||
for (blknum, last_lsn) in updated.iter().enumerate() {
|
||||
test_key.field6 = (blknum * STEP) as u32;
|
||||
@@ -5595,7 +5600,7 @@ mod tests {
|
||||
.get_vectored_impl(
|
||||
keyspace.clone(),
|
||||
lsn,
|
||||
ValuesReconstructState::default(),
|
||||
&mut ValuesReconstructState::default(),
|
||||
&ctx,
|
||||
)
|
||||
.await?
|
||||
@@ -5631,17 +5636,91 @@ mod tests {
|
||||
updated[blknum] = lsn;
|
||||
}
|
||||
|
||||
// Perform a cycle of flush, compact, and GC
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
|
||||
tenant
|
||||
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
|
||||
.await?;
|
||||
// Perform two cycles of flush, compact, and GC
|
||||
for round in 0..2 {
|
||||
tline.freeze_and_flush().await?;
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
if iter % 5 == 0 && round == 0 {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags
|
||||
} else {
|
||||
EnumSet::empty()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
tenant
|
||||
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_compaction_trigger() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_metadata_compaction_trigger")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
|
||||
base_key.field1 = AUX_KEY_PREFIX;
|
||||
let test_key = base_key;
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
for _ in 0..20 {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", 0, lsn))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
tline.freeze_and_flush().await?; // force create a delta layer
|
||||
}
|
||||
|
||||
let before_num_l0_delta_files = tline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.get_level0_deltas()?
|
||||
.len();
|
||||
|
||||
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
|
||||
|
||||
let after_num_l0_delta_files = tline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.get_level0_deltas()?
|
||||
.len();
|
||||
|
||||
assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");
|
||||
|
||||
assert_eq!(
|
||||
tline.get(test_key, lsn, &ctx).await?,
|
||||
test_img(&format!("{} at {}", 0, lsn))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_copies_dirty_aux_file_flag() {
|
||||
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag").unwrap();
|
||||
|
||||
@@ -113,12 +113,17 @@ impl From<VectoredValueReconstructState> for ValueReconstructState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Bag of data accumulated during a vectored get
|
||||
/// Bag of data accumulated during a vectored get.
|
||||
pub(crate) struct ValuesReconstructState {
|
||||
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
|
||||
/// should not expect to get anything from this hashmap.
|
||||
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
|
||||
|
||||
keys_done: KeySpaceRandomAccum,
|
||||
|
||||
// Statistics that are still accessible as a caller of `get_vectored_impl`.
|
||||
layers_visited: u32,
|
||||
delta_layers_visited: u32,
|
||||
}
|
||||
|
||||
impl ValuesReconstructState {
|
||||
@@ -127,6 +132,7 @@ impl ValuesReconstructState {
|
||||
keys: HashMap::new(),
|
||||
keys_done: KeySpaceRandomAccum::new(),
|
||||
layers_visited: 0,
|
||||
delta_layers_visited: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,8 +146,17 @@ impl ValuesReconstructState {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn on_layer_visited(&mut self) {
|
||||
pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
|
||||
self.layers_visited += 1;
|
||||
if let ReadableLayer::PersistentLayer(layer) = layer {
|
||||
if layer.layer_desc().is_delta() {
|
||||
self.delta_layers_visited += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_delta_layers_visited(&self) -> u32 {
|
||||
self.delta_layers_visited
|
||||
}
|
||||
|
||||
pub(crate) fn get_layers_visited(&self) -> u32 {
|
||||
|
||||
@@ -18,8 +18,8 @@ use fail::fail_point;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::{
|
||||
AUX_FILES_KEY, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
|
||||
NON_INHERITED_SPARSE_RANGE,
|
||||
AUX_FILES_KEY, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
|
||||
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
|
||||
},
|
||||
keyspace::{KeySpaceAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
@@ -60,7 +60,6 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::init::LocalLayerFileMetadata;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
tenant::{
|
||||
@@ -89,6 +88,9 @@ use crate::{
|
||||
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{
|
||||
pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::timeline::init::LocalLayerFileMetadata,
|
||||
};
|
||||
use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
@@ -782,6 +784,11 @@ pub(crate) enum ShutdownMode {
|
||||
Hard,
|
||||
}
|
||||
|
||||
struct ImageLayerCreationOutcome {
|
||||
image: Option<ResidentLayer>,
|
||||
next_start_key: Key,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -883,7 +890,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
|
||||
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
@@ -1028,7 +1035,12 @@ impl Timeline {
|
||||
}
|
||||
GetVectoredImpl::Vectored => {
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx)
|
||||
.get_vectored_impl(
|
||||
keyspace.clone(),
|
||||
lsn,
|
||||
&mut ValuesReconstructState::new(),
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
@@ -1116,7 +1128,7 @@ impl Timeline {
|
||||
.get_vectored_impl(
|
||||
keyspace.clone(),
|
||||
lsn,
|
||||
ValuesReconstructState::default(),
|
||||
&mut ValuesReconstructState::default(),
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
@@ -1193,7 +1205,7 @@ impl Timeline {
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValuesReconstructState,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let get_kind = if keyspace.total_raw_size() == 1 {
|
||||
@@ -1205,7 +1217,7 @@ impl Timeline {
|
||||
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
|
||||
self.get_vectored_reconstruct_data(keyspace, lsn, reconstruct_state, ctx)
|
||||
.await?;
|
||||
get_data_timer.stop_and_record();
|
||||
|
||||
@@ -1214,7 +1226,8 @@ impl Timeline {
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
for (key, res) in reconstruct_state.keys {
|
||||
|
||||
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
match res {
|
||||
Err(err) => {
|
||||
results.insert(key, Err(err));
|
||||
@@ -3448,7 +3461,7 @@ impl Timeline {
|
||||
unmapped_keyspace = keyspace_to_read;
|
||||
cont_lsn = next_cont_lsn;
|
||||
|
||||
reconstruct_state.on_layer_visited();
|
||||
reconstruct_state.on_layer_visited(&layer_to_read);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@@ -4134,6 +4147,176 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
|
||||
/// Create image layers for Postgres data. Assumes the caller passes a partition that is not too large,
|
||||
/// so that at most one image layer will be produced from this function.
|
||||
async fn create_image_layer_for_rel_blocks(
|
||||
self: &Arc<Self>,
|
||||
partition: &KeySpace,
|
||||
mut image_layer_writer: ImageLayerWriter,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
img_range: Range<Key>,
|
||||
start: Key,
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
let mut wrote_keys = false;
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
// need to drop keys that don't belong to them. If we retain the key, add it
|
||||
// to `key_request_accum` for later issuing a vectored get
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
} else {
|
||||
key_request_accum.add_key(key);
|
||||
}
|
||||
|
||||
let last_key_in_range = key.next() == range.end;
|
||||
key = key.next();
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| (last_key_in_range && key_request_accum.raw_size() > 0)
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
|
||||
.await?;
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key) {
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CreateImageLayersError::PageReconstructError(err));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
} else {
|
||||
// Special case: the image layer may be empty if this is a sharded tenant and the
|
||||
// partition does not cover any keys owned by this shard. In this case, to ensure
|
||||
// we don't leave gaps between image layers, leave `start` where it is, so that the next
|
||||
// layer we write will cover the key range that we just scanned.
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: None,
|
||||
next_start_key: start,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Create an image layer for metadata keys. This function produces one image layer for all metadata
|
||||
/// keys for now. Because metadata keys cannot exceed basebackup size limit, the image layer for it
|
||||
/// would not be too large to fit in a single image layer.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn create_image_layer_for_metadata_keys(
|
||||
self: &Arc<Self>,
|
||||
partition: &KeySpace,
|
||||
mut image_layer_writer: ImageLayerWriter,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
img_range: Range<Key>,
|
||||
mode: ImageLayerCreationMode,
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
assert!(!matches!(mode, ImageLayerCreationMode::Initial));
|
||||
|
||||
// Metadata keys image layer creation.
|
||||
let mut reconstruct_state = ValuesReconstructState::default();
|
||||
let data = self
|
||||
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
let (data, total_kb_retrieved, total_key_retrieved) = {
|
||||
let mut new_data = BTreeMap::new();
|
||||
let mut total_kb_retrieved = 0;
|
||||
let mut total_key_retrieved = 0;
|
||||
for (k, v) in data {
|
||||
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
total_kb_retrieved += KEY_SIZE + v.len();
|
||||
total_key_retrieved += 1;
|
||||
new_data.insert(k, v);
|
||||
}
|
||||
(new_data, total_kb_retrieved / 1024, total_key_retrieved)
|
||||
};
|
||||
let delta_file_accessed = reconstruct_state.get_delta_layers_visited();
|
||||
|
||||
let trigger_generation = delta_file_accessed as usize >= MAX_AUX_FILE_V2_DELTAS;
|
||||
info!(
|
||||
"generate image layers for metadata keys: trigger_generation={trigger_generation}, \
|
||||
delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, \
|
||||
total_key_retrieved={total_key_retrieved}"
|
||||
);
|
||||
if !trigger_generation && mode == ImageLayerCreationMode::Try {
|
||||
return Ok(ImageLayerCreationOutcome {
|
||||
image: None,
|
||||
next_start_key: img_range.end,
|
||||
});
|
||||
}
|
||||
let has_keys = !data.is_empty();
|
||||
for (k, v) in data {
|
||||
// Even if the value is empty (deleted), we do not delete it for now until we can ensure vectored get
|
||||
// considers this situation properly.
|
||||
// if v.is_empty() {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// No need to handle sharding b/c metadata keys are always on the 0-th shard.
|
||||
|
||||
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
|
||||
// on the normal data path either.
|
||||
image_layer_writer.put_image(k, v, ctx).await?;
|
||||
}
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: if has_keys {
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
Some(image_layer)
|
||||
} else {
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
None
|
||||
},
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
|
||||
async fn create_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
@@ -4175,19 +4358,17 @@ impl Timeline {
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
|
||||
if partition.overlaps(&Key::metadata_key_range()) {
|
||||
// TODO(chi): The next patch will correctly create image layers for metadata keys, and it would be a
|
||||
// rather big change. Keep this patch small for now.
|
||||
match mode {
|
||||
ImageLayerCreationMode::Force | ImageLayerCreationMode::Try => {
|
||||
// skip image layer creation anyways for metadata keys.
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
ImageLayerCreationMode::Initial => {
|
||||
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
|
||||
}
|
||||
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
|
||||
if compact_metadata {
|
||||
for range in &partition.ranges {
|
||||
assert!(
|
||||
range.start.field1 >= METADATA_KEY_BEGIN_PREFIX
|
||||
&& range.end.field1 <= METADATA_KEY_END_PREFIX,
|
||||
"metadata keys must be partitioned separately"
|
||||
);
|
||||
}
|
||||
if mode == ImageLayerCreationMode::Initial {
|
||||
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
|
||||
}
|
||||
} else if let ImageLayerCreationMode::Try = mode {
|
||||
// check_for_image_layers = false -> skip
|
||||
@@ -4198,7 +4379,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
let image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
@@ -4214,87 +4395,39 @@ impl Timeline {
|
||||
)))
|
||||
});
|
||||
|
||||
let mut wrote_keys = false;
|
||||
if !compact_metadata {
|
||||
let ImageLayerCreationOutcome {
|
||||
image,
|
||||
next_start_key,
|
||||
} = self
|
||||
.create_image_layer_for_rel_blocks(
|
||||
partition,
|
||||
image_layer_writer,
|
||||
lsn,
|
||||
ctx,
|
||||
img_range,
|
||||
start,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
// need to drop keys that don't belong to them. If we retain the key, add it
|
||||
// to `key_request_accum` for later issuing a vectored get
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
} else {
|
||||
key_request_accum.add_key(key);
|
||||
}
|
||||
|
||||
let last_key_in_range = key.next() == range.end;
|
||||
key = key.next();
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| (last_key_in_range && key_request_accum.raw_size() > 0)
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
|
||||
.await?;
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
|
||||
{
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CreateImageLayersError::PageReconstructError(
|
||||
err,
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
start = img_range.end;
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
image_layers.push(image_layer);
|
||||
start = next_start_key;
|
||||
image_layers.extend(image);
|
||||
} else {
|
||||
// Special case: the image layer may be empty if this is a sharded tenant and the
|
||||
// partition does not cover any keys owned by this shard. In this case, to ensure
|
||||
// we don't leave gaps between image layers, leave `start` where it is, so that the next
|
||||
// layer we write will cover the key range that we just scanned.
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
let ImageLayerCreationOutcome {
|
||||
image,
|
||||
next_start_key,
|
||||
} = self
|
||||
.create_image_layer_for_metadata_keys(
|
||||
partition,
|
||||
image_layer_writer,
|
||||
lsn,
|
||||
ctx,
|
||||
img_range,
|
||||
mode,
|
||||
)
|
||||
.await?;
|
||||
start = next_start_key;
|
||||
image_layers.extend(image);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -116,9 +116,13 @@ impl Timeline {
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let dense_layers = self
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
let image_layers = self
|
||||
.create_image_layers(
|
||||
&dense_partitioning,
|
||||
&partitioning,
|
||||
lsn,
|
||||
if flags.contains(CompactFlags::ForceImageLayerCreation) {
|
||||
ImageLayerCreationMode::Force
|
||||
@@ -130,24 +134,8 @@ impl Timeline {
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
|
||||
// For now, nothing will be produced...
|
||||
let sparse_layers = self
|
||||
.create_image_layers(
|
||||
&sparse_partitioning.clone().into_dense(),
|
||||
lsn,
|
||||
if flags.contains(CompactFlags::ForceImageLayerCreation) {
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
assert!(sparse_layers.is_empty());
|
||||
|
||||
self.upload_new_image_layers(dense_layers)?;
|
||||
dense_partitioning.parts.len()
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
partitioning.parts.len()
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
@@ -499,8 +487,11 @@ impl Timeline {
|
||||
|
||||
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
|
||||
// compaction is the gap between data key and metadata keys.
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
|
||||
&& !Key::is_metadata_key(&prev_key)
|
||||
{
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
|
||||
Reference in New Issue
Block a user