Compare commits

...

7 Commits

Author SHA1 Message Date
John Spray
f8b31016f3 pageserver: re-order wait_completion in GC 2023-10-24 12:04:44 +01:00
John Spray
6a2d616f4d pageserver: on failed upload, remove layer from latest_files 2023-10-24 12:03:28 +01:00
John Spray
ce782f2fa6 Upgrade an info! to an error! for a critical issue. 2023-10-24 12:03:28 +01:00
John Spray
b7ff5cfca7 tests: fix test_compaction_delete_before_upload 2023-10-24 11:59:29 +01:00
John Spray
96afb8327b Update local disk_consistent_lsn before remote 2023-10-24 11:15:42 +01:00
John Spray
1208266716 pageserver: schedule frozen layer uploads inside layers lock 2023-10-24 10:42:00 +01:00
John Spray
84d03f12de pageserver: refactor layer upload out of update_metadata_file 2023-10-24 10:38:35 +01:00
4 changed files with 140 additions and 64 deletions

View File

@@ -691,11 +691,9 @@ impl RemoteTimelineClient {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
Some((name, meta.generation))
} else {
// This can only happen if we forgot to to schedule the file upload
// before scheduling the delete. Log it because it is a rare/strange
// situation, and in case something is misbehaving, we'd like to know which
// layers experienced this.
info!(
// This should never happen: callers should ensure that any layer they
// request deletion of has already been scheduled for upload
error!(
"Deleting layer {name} not found in latest_files list, never uploaded?"
);
None
@@ -1099,7 +1097,7 @@ impl RemoteTimelineClient {
.timeline_path(&self.tenant_id, &self.timeline_id)
.join(layer_file_name.file_name());
upload::upload_timeline_layer(
let result = upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
&path,
@@ -1113,7 +1111,27 @@ impl RemoteTimelineClient {
RemoteOpKind::Upload,
Arc::clone(&self.metrics),
)
.await
.await;
if let Ok(outcome) = &result {
match outcome {
upload::UploadOutcome::NotFound => {
// Layer uploads can be no-ops if the file is not found on local disk. In this case, we must
// update our remote metadata to reflect that the file doesn't exist (it was added to `latest_files`)
// when scheduled.
let mut guard = self.upload_queue.lock().unwrap();
if let Ok(upload_queue) = guard.initialized_mut() {
upload_queue.latest_files.remove(layer_file_name);
upload_queue
.latest_files_changes_since_metadata_upload_scheduled += 1;
}
}
upload::UploadOutcome::Uploaded => {
// Success, no special handling required.
}
}
}
result.map(|_| ())
}
UploadOp::UploadMetadata(ref index_part, _lsn) => {
let mention_having_future_layers = if cfg!(feature = "testing") {

View File

@@ -45,6 +45,11 @@ pub(super) async fn upload_index_part<'a>(
.with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'"))
}
pub(super) enum UploadOutcome {
Uploaded,
NotFound,
}
/// Attempts to upload given layer files.
/// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
///
@@ -55,11 +60,13 @@ pub(super) async fn upload_timeline_layer<'a>(
source_path: &'a Utf8Path,
known_metadata: &'a LayerFileMetadata,
generation: Generation,
) -> anyhow::Result<()> {
) -> anyhow::Result<UploadOutcome> {
fail_point!("before-upload-layer", |_| {
bail!("failpoint before-upload-layer")
});
pausable_failpoint!("before-upload-layer-pausable");
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await;
let source_file = match source_file_res {
@@ -71,7 +78,7 @@ pub(super) async fn upload_timeline_layer<'a>(
// something worse, like when a file is scheduled for upload before
// it has been written to disk yet.
info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(());
return Ok(UploadOutcome::NotFound);
}
Err(e) => {
Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
@@ -97,5 +104,5 @@ pub(super) async fn upload_timeline_layer<'a>(
.await
.with_context(|| format!("upload layer from local path '{source_path}'"))?;
Ok(())
Ok(UploadOutcome::Uploaded)
}

View File

@@ -2793,10 +2793,13 @@ impl Timeline {
)
};
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{
let metadata = {
let mut guard = self.layers.write().await;
if let Some(ref l) = delta_layer_to_add {
@@ -2812,8 +2815,17 @@ impl Timeline {
}
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.disk_consistent_lsn.store(disk_consistent_lsn);
// Schedule remote uploads that will reflect our new disk_consistent_lsn
Some(self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?)
} else {
None
}
// release lock on 'layers'
}
};
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
@@ -2829,28 +2841,22 @@ impl Timeline {
//
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
// *all* the layers, to avoid fsyncing the file multiple times.
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
// If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
if let Some(metadata) = metadata {
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("update_metadata_file")?;
// Also update the in-memory copy
self.disk_consistent_lsn.store(disk_consistent_lsn);
.context("save_metadata")?;
}
Ok(())
}
/// Update metadata file
async fn update_metadata_file(
fn schedule_uploads(
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
) -> anyhow::Result<()> {
) -> anyhow::Result<TimelineMetadata> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
@@ -2887,10 +2893,6 @@ impl Timeline {
x.unwrap()
));
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("save_metadata")?;
if let Some(remote_client) = &self.remote_client {
for (path, layer_metadata) in layer_paths_to_upload {
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
@@ -2898,6 +2900,20 @@ impl Timeline {
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
}
Ok(metadata)
}
async fn update_metadata_file(
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
) -> anyhow::Result<()> {
let metadata = self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?;
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("save_metadata")?;
Ok(())
}
@@ -4107,17 +4123,6 @@ impl Timeline {
debug!("retain_lsns: {:?}", retain_lsns);
// Before deleting any layers, we need to wait for their upload ops to finish.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let mut layers_to_remove = Vec::new();
let mut wanted_image_layers = KeySpaceRandomAccum::default();
@@ -4228,6 +4233,18 @@ impl Timeline {
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
if !layers_to_remove.is_empty() {
// Before deleting any layers, we need to wait for their upload ops to finish.
// Presence of a layer in LayerManager implies it has been schedule for upload,
// so by waiting for uploads _after_ walking the layers, we are guaranteed that
// everything we are deleting was uploaded.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())

View File

@@ -757,12 +757,14 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
create_thread.join()
# Regression test for a race condition where L0 layers are compacted before the upload,
# resulting in the uploading complaining about the file not being found
# https://github.com/neondatabase/neon/issues/4526
def test_compaction_delete_before_upload(
def test_compaction_waits_for_upload(
neon_env_builder: NeonEnvBuilder,
):
"""
Compaction waits for outstanding uploads to complete, so that it avoids deleting layers
files that have not yet been uploaded. This test forces a race between upload and
compaction.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
@@ -792,50 +794,82 @@ def test_compaction_delete_before_upload(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# Now make the flushing hang and update one small piece of data
client.configure_failpoints(("flush-frozen-pausable", "pause"))
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_barrier = threading.Barrier(2)
def checkpoint_in_background():
barrier.wait()
try:
log.info("Checkpoint starting")
client.timeline_checkpoint(tenant_id, timeline_id)
q.put(None)
log.info("Checkpoint complete")
checkpoint_result.put(None)
except PageserverApiException as e:
q.put(e)
log.info("Checkpoint errored: {e}")
checkpoint_result.put(e)
create_thread = threading.Thread(target=checkpoint_in_background)
create_thread.start()
def compact_in_background():
compact_barrier.wait()
try:
log.info("Compaction starting")
client.timeline_compact(tenant_id, timeline_id)
log.info("Compaction complete")
compact_result.put(None)
except PageserverApiException as e:
log.info("Compaction errored: {e}")
compact_result.put(e)
checkpoint_thread = threading.Thread(target=checkpoint_in_background)
checkpoint_thread.start()
compact_thread = threading.Thread(target=compact_in_background)
compact_thread.start()
try:
barrier.wait()
# Start the checkpoint, see that it blocks
log.info("Waiting to see checkpoint hang...")
time.sleep(5)
assert checkpoint_result.empty()
time.sleep(4)
client.timeline_compact(tenant_id, timeline_id)
# Start the compaction, see that it finds work to do but blocks
compact_barrier.wait()
log.info("Waiting to see compaction hang...")
time.sleep(5)
assert compact_barrier.n_waiting == 0
assert compact_result.empty()
client.configure_failpoints(("flush-frozen-pausable", "off"))
# This is logged once compaction is started, but before we wait for operations to complete
assert env.pageserver.log_contains("compact_level0_phase1 stats available.")
conflict = q.get()
# Once we unblock uploads the compaction should complete successfully
log.info("Disabling failpoint")
client.configure_failpoints(("before-upload-layer-pausable", "off"))
log.info("Awaiting compaction result")
assert compact_result.get(timeout=10) is None
log.info("Awaiting checkpoint result")
assert checkpoint_result.get(timeout=10) is None
assert conflict is None
except Exception:
# Log the actual failure's backtrace here, before we proceed to join threads
log.exception("Failure, cleaning up...")
raise
finally:
create_thread.join()
compact_barrier.abort()
# Add a delay for the uploads to run into either the file not found or the
time.sleep(4)
checkpoint_thread.join()
compact_thread.join()
# Ensure that this actually terminates
wait_upload_queue_empty(client, tenant_id, timeline_id)
# For now we are hitting this message.
# Maybe in the future the underlying race condition will be fixed,
# but until then, ensure that this message is hit instead.
assert env.pageserver.log_contains(
# We should not have hit the error handling path in uploads where the remote file is gone
assert not env.pageserver.log_contains(
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
)