compaction: integration

This commit is contained in:
Joonas Koivunen
2023-08-16 12:25:07 +03:00
parent 82a955ebfe
commit 975e1558cc
2 changed files with 84 additions and 197 deletions

View File

@@ -1028,7 +1028,7 @@ async fn timeline_compact_handler(
timeline
.compact(&cancel, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
@@ -1053,7 +1053,7 @@ async fn timeline_checkpoint_handler(
timeline
.compact(&cancel, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, ())
}

View File

@@ -626,9 +626,7 @@ impl Timeline {
self: &Arc<Self>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
) -> Result<(), CompactionError> {
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
@@ -670,87 +668,6 @@ impl Timeline {
return Ok(());
}
// retry two times to allow first round to find layers which need to be downloaded, then
// download them, then retry compaction
for round in 0..ROUNDS {
// should we error out with the most specific error?
let last_round = round == ROUNDS - 1;
let res = self.compact_inner(ctx).await;
// If `create_image_layers' or `compact_level0` scheduled any
// uploads or deletions, but didn't update the index file yet,
// do it now.
//
// This isn't necessary for correctness, the remote state is
// consistent without the uploads and deletions, and we would
// update the index file on next flush iteration too. But it
// could take a while until that happens.
//
// Additionally, only do this once before we return from this function.
if last_round || res.is_ok() {
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_index_upload_for_file_changes()?;
}
}
let rls = match res {
Ok(()) => return Ok(()),
Err(CompactionError::DownloadRequired(rls)) if !last_round => {
// this can be done at most one time before exiting, waiting
rls
}
Err(CompactionError::DownloadRequired(rls)) => {
anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len())
}
Err(CompactionError::ShuttingDown) => {
return Ok(());
}
Err(CompactionError::Other(e)) => {
return Err(e);
}
};
// this path can be visited in the second round of retrying, if first one found that we
// must first download some remote layers
let total = rls.len();
let mut downloads = rls
.into_iter()
.map(|rl| self.download_remote_layer(rl))
.collect::<futures::stream::FuturesUnordered<_>>();
let mut failed = 0;
loop {
tokio::select! {
_ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
res = downloads.next() => {
match res {
Some(Ok(())) => {},
Some(Err(e)) => {
warn!("Downloading remote layer for compaction failed: {e:#}");
failed += 1;
}
None => break,
}
}
}
}
if failed != 0 {
anyhow::bail!("{failed} out of {total} layers failed to download, retrying later");
}
// if everything downloaded fine, lets try again
}
unreachable!("retry loop exits")
}
/// Compaction which might need to be retried after downloading remote layers.
async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
@@ -795,6 +712,7 @@ impl Timeline {
// Define partitioning schema if needed
// FIXME: the match should only cover repartitioning, not the next steps
match self
.repartition(
self.get_last_record_lsn(),
@@ -2876,28 +2794,18 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<Arc<DeltaLayer>>,
deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
new_layers: Vec<ResidentLayer>,
deltas_to_compact: Vec<Arc<LayerE>>,
}
/// Top-level failure to compact.
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
enum CompactionError {
/// L0 compaction requires layers to be downloaded.
///
/// This should not happen repeatedly, but will be retried once by top-level
/// `Timeline::compact`.
DownloadRequired(Vec<Arc<RemoteLayer>>),
/// The timeline or pageserver is shutting down
#[error("The timeline or pageserver is shutting down")]
ShuttingDown,
/// Compaction cannot be done right now; page reconstruction and so on.
Other(anyhow::Error),
}
impl From<anyhow::Error> for CompactionError {
fn from(value: anyhow::Error) -> Self {
CompactionError::Other(value)
}
#[error(transparent)]
Other(#[from] anyhow::Error),
}
#[serde_as]
@@ -3022,13 +2930,11 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
impl Timeline {
/// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
/// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
///
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
///
/// [`compact_inner`]: Self::compact_inner
async fn compact_level0_phase1(
self: &Arc<Self>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
@@ -3073,19 +2979,32 @@ impl Timeline {
// size length. Compaction will likely create the same set of n files afterwards.
//
// This failpoint is a superset of both of the cases.
fail_point!("compact-level0-phase1-return-same", |_| {
println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
Ok(CompactLevel0Phase1Result {
new_layers: level0_deltas
.iter()
.map(|x| x.clone().downcast_delta_layer().unwrap())
.collect(),
deltas_to_compact: level0_deltas
.iter()
.map(|x| x.layer_desc().clone().into())
.collect(),
})
});
if cfg!(feature = "testing") {
// FIXME: utils does not depend on `fail` so there's no non-macro answer to this
let active = (|| {
::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
false
})();
if active {
let mut new_layers = Vec::with_capacity(level0_deltas.len());
for delta in &level0_deltas {
// we are just faking these layers as being produced again for this failpoint
new_layers.push(
delta
.guard_against_eviction(true)
.await
.context("download layer for failpoint")?,
);
}
// FIXME: println
println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
return Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: level0_deltas,
});
}
}
// Gather the files to compact in this iteration.
//
@@ -3105,14 +3024,15 @@ impl Timeline {
let first_level0_delta = level0_deltas_iter.next().unwrap();
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
deltas_to_compact.push(first_level0_delta.clone());
for l in level0_deltas_iter {
let lsn_range = &l.layer_desc().lsn_range;
if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(Arc::clone(l));
deltas_to_compact.push(l.clone());
prev_lsn_end = lsn_range.end;
}
let lsn_range = Range {
@@ -3125,24 +3045,6 @@ impl Timeline {
end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
};
let remotes = deltas_to_compact
.iter()
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {l}"))
.map(|l| {
l.clone()
.downcast_remote_layer()
.expect("just checked it is remote layer")
})
.collect::<Vec<_>>();
if !remotes.is_empty() {
// caller is holding the lock to layer_removal_cs, and we don't want to download while
// holding that; in future download_remote_layer might take it as well. this is
// regardless of earlier image creation downloading on-demand, while holding the lock.
return Err(CompactionError::DownloadRequired(remotes));
}
info!(
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
lsn_range.start,
@@ -3175,23 +3077,19 @@ impl Timeline {
let mut all_keys = Vec::new();
let downcast_deltas: Vec<_> = deltas_to_compact
.iter()
.map(|l| l.clone().downcast_delta_layer().expect("delta layer"))
.collect();
for dl in downcast_deltas.iter() {
// TODO: replace this with an await once we fully go async
all_keys.extend(DeltaLayer::load_keys(dl, ctx).await?);
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await?);
}
// FIXME: should spawn_blocking the rest of this function
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
for DeltaEntry { key: next_key, .. } in all_keys.iter() {
let next_key = *next_key;
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
@@ -3351,9 +3249,12 @@ impl Timeline {
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(Arc::new(
writer.take().unwrap().finish(prev_key.unwrap().next())?,
));
new_layers.push(
writer
.take()
.unwrap()
.finish(prev_key.unwrap().next(), self)?,
);
writer = None;
if contains_hole {
@@ -3393,7 +3294,7 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?));
new_layers.push(writer.finish(prev_key.unwrap().next(), &self)?);
}
// Sync layers
@@ -3411,10 +3312,17 @@ impl Timeline {
);
}
}
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
let mut layer_paths: Vec<PathBuf> = new_layers
.iter()
.map(|l| l.local_path().to_owned())
.collect();
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
//
// FIXME: spawn_blocking above for this
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
@@ -3443,10 +3351,7 @@ impl Timeline {
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: deltas_to_compact
.into_iter()
.map(|x| Arc::new(x.layer_desc().clone()))
.collect(),
deltas_to_compact,
})
}
@@ -3507,7 +3412,6 @@ impl Timeline {
}
let mut guard = self.layers.write().await;
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
// In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction.
// We should move to numbering the layer files instead of naming them using key range / LSN some day. But for
@@ -3515,63 +3419,45 @@ impl Timeline {
let mut duplicated_layers = HashSet::new();
let mut insert_layers = Vec::new();
let mut remove_layers = Vec::new();
for l in new_layers {
let new_delta_path = l.path();
let metadata = new_delta_path.metadata().with_context(|| {
format!(
"read file metadata for new created layer {}",
new_delta_path.display()
)
})?;
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload(
&l.filename(),
&LayerFileMetadata::new(metadata.len()),
)?;
let m = LayerFileMetadata::new(l.layer_desc().file_size);
// upload even if duplicated, because we may have changed the contents
remote_client.schedule_layer_file_upload(l.clone(), &m)?;
}
// update the timeline's physical size
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
l.access_stats().record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
let l = l as Arc<dyn PersistentLayer>;
if guard.contains(&l) {
if guard.contains(l.as_ref()) {
duplicated_layers.insert(l.layer_desc().key());
} else if LayerMap::is_l0(l.layer_desc()) {
return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
} else {
if LayerMap::is_l0(l.layer_desc()) {
return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
}
// update the timeline's physical size
self.metrics
.resident_physical_size_gauge
.add(l.layer_desc().file_size);
insert_layers.push(l);
}
}
let remove_layers = {
let mut deltas_to_compact = deltas_to_compact;
// only remove those inputs which were not outputs
deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
deltas_to_compact
};
// Now that we have reshuffled the data to set of new delta layers, we can
// delete the old ones
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for ldesc in deltas_to_compact {
if duplicated_layers.contains(&ldesc.key()) {
// skip duplicated layers, they will not be removed; we have already overwritten them
// with new layers in the compaction phase 1.
continue;
}
layer_names_to_delete.push(ldesc.filename());
remove_layers.push(guard.get_from_desc(&ldesc));
let mut layer_names_to_delete = Vec::with_capacity(remove_layers.len());
for delta in &remove_layers {
layer_names_to_delete.push(delta.layer_desc().filename());
}
guard.finish_compact_l0(
&layer_removal_cs,
remove_layers,
insert_layers,
&insert_layers,
&self.metrics,
)?;
@@ -3579,6 +3465,7 @@ impl Timeline {
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
// FIXME: this needs to be moved to LayerE::drop possibly?
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}