Compare commits

..

2 Commits

Author SHA1 Message Date
Bojan Serafimov
a8fd6266aa wip 2023-10-27 11:42:18 -04:00
Bojan Serafimov
151605d751 wip 2023-10-24 13:11:40 -04:00
8 changed files with 89 additions and 150 deletions

View File

@@ -36,7 +36,7 @@ use utils::pid_file::{self, PidFileRead};
// it's waiting. If the process hasn't started/stopped after 5 seconds,
// it prints a notice that it's taking long, but keeps waiting.
//
const RETRY_UNTIL_SECS: u64 = 10;
const RETRY_UNTIL_SECS: u64 = 10000;
const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
const RETRY_INTERVAL_MILLIS: u64 = 100;
const DOT_EVERY_RETRIES: u64 = 10;

View File

@@ -267,12 +267,6 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
let buf = download_and_compare(dl).await?;
assert_eq!(buf, data);
debug!("Cleanup: deleting file at path {path:?}");
ctx.client
.delete(&path)
.await
.with_context(|| format!("{path:?} removal"))?;
Ok(())
}

View File

@@ -691,9 +691,11 @@ impl RemoteTimelineClient {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
Some((name, meta.generation))
} else {
// This should never happen: callers should ensure that any layer they
// request deletion of has already been scheduled for upload
error!(
// This can only happen if we forgot to to schedule the file upload
// before scheduling the delete. Log it because it is a rare/strange
// situation, and in case something is misbehaving, we'd like to know which
// layers experienced this.
info!(
"Deleting layer {name} not found in latest_files list, never uploaded?"
);
None
@@ -1097,7 +1099,7 @@ impl RemoteTimelineClient {
.timeline_path(&self.tenant_id, &self.timeline_id)
.join(layer_file_name.file_name());
let result = upload::upload_timeline_layer(
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
&path,
@@ -1111,27 +1113,7 @@ impl RemoteTimelineClient {
RemoteOpKind::Upload,
Arc::clone(&self.metrics),
)
.await;
if let Ok(outcome) = &result {
match outcome {
upload::UploadOutcome::NotFound => {
// Layer uploads can be no-ops if the file is not found on local disk. In this case, we must
// update our remote metadata to reflect that the file doesn't exist (it was added to `latest_files`)
// when scheduled.
let mut guard = self.upload_queue.lock().unwrap();
if let Ok(upload_queue) = guard.initialized_mut() {
upload_queue.latest_files.remove(layer_file_name);
upload_queue
.latest_files_changes_since_metadata_upload_scheduled += 1;
}
}
upload::UploadOutcome::Uploaded => {
// Success, no special handling required.
}
}
}
result.map(|_| ())
.await
}
UploadOp::UploadMetadata(ref index_part, _lsn) => {
let mention_having_future_layers = if cfg!(feature = "testing") {

View File

@@ -45,11 +45,6 @@ pub(super) async fn upload_index_part<'a>(
.with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'"))
}
pub(super) enum UploadOutcome {
Uploaded,
NotFound,
}
/// Attempts to upload given layer files.
/// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
///
@@ -60,13 +55,11 @@ pub(super) async fn upload_timeline_layer<'a>(
source_path: &'a Utf8Path,
known_metadata: &'a LayerFileMetadata,
generation: Generation,
) -> anyhow::Result<UploadOutcome> {
) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| {
bail!("failpoint before-upload-layer")
});
pausable_failpoint!("before-upload-layer-pausable");
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await;
let source_file = match source_file_res {
@@ -78,7 +71,7 @@ pub(super) async fn upload_timeline_layer<'a>(
// something worse, like when a file is scheduled for upload before
// it has been written to disk yet.
info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(UploadOutcome::NotFound);
return Ok(());
}
Err(e) => {
Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
@@ -104,5 +97,5 @@ pub(super) async fn upload_timeline_layer<'a>(
.await
.with_context(|| format!("upload layer from local path '{source_path}'"))?;
Ok(UploadOutcome::Uploaded)
Ok(())
}

View File

@@ -2793,13 +2793,10 @@ impl Timeline {
)
};
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
let metadata = {
{
let mut guard = self.layers.write().await;
if let Some(ref l) = delta_layer_to_add {
@@ -2815,17 +2812,8 @@ impl Timeline {
}
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.disk_consistent_lsn.store(disk_consistent_lsn);
// Schedule remote uploads that will reflect our new disk_consistent_lsn
Some(self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?)
} else {
None
}
// release lock on 'layers'
};
}
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
@@ -2841,22 +2829,28 @@ impl Timeline {
//
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
// *all* the layers, to avoid fsyncing the file multiple times.
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
// If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
if let Some(metadata) = metadata {
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
.await
.context("save_metadata")?;
.context("update_metadata_file")?;
// Also update the in-memory copy
self.disk_consistent_lsn.store(disk_consistent_lsn);
}
Ok(())
}
/// Update metadata file
fn schedule_uploads(
async fn update_metadata_file(
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
) -> anyhow::Result<TimelineMetadata> {
) -> anyhow::Result<()> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
@@ -2893,6 +2887,10 @@ impl Timeline {
x.unwrap()
));
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("save_metadata")?;
if let Some(remote_client) = &self.remote_client {
for (path, layer_metadata) in layer_paths_to_upload {
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
@@ -2900,20 +2898,6 @@ impl Timeline {
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
}
Ok(metadata)
}
async fn update_metadata_file(
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
) -> anyhow::Result<()> {
let metadata = self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?;
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("save_metadata")?;
Ok(())
}
@@ -4123,6 +4107,17 @@ impl Timeline {
debug!("retain_lsns: {:?}", retain_lsns);
// Before deleting any layers, we need to wait for their upload ops to finish.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let mut layers_to_remove = Vec::new();
let mut wanted_image_layers = KeySpaceRandomAccum::default();
@@ -4233,18 +4228,6 @@ impl Timeline {
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
if !layers_to_remove.is_empty() {
// Before deleting any layers, we need to wait for their upload ops to finish.
// Presence of a layer in LayerManager implies it has been schedule for upload,
// so by waiting for uploads _after_ walking the layers, we are guaranteed that
// everything we are deleting was uploaded.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
// Persist the new GC cutoff value in the metadata file, before
// we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())

View File

@@ -157,7 +157,7 @@ def wait_for_last_record_lsn(
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(100):
for i in range(1000000):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn

View File

@@ -1,5 +1,8 @@
import pytest
import os
import shutil
from contextlib import closing
from fixtures.log_helper import log
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.pg_version import PgVersion
@@ -18,6 +21,9 @@ from fixtures.pg_version import PgVersion
def test_bulk_insert(neon_with_baseline: PgCompare):
env = neon_with_baseline
# Number of times to run the write query. One run creates 350MB of wal.
n_writes = 10
with closing(env.pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table huge (i int, j int);")
@@ -25,7 +31,10 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
# Run INSERT, recording the time and I/O it takes
with env.record_pageserver_writes("pageserver_writes"):
with env.record_duration("insert"):
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
for i in range(n_writes):
if n_writes > 1:
log.info(f"running query {i}/{n_writes}")
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
env.flush()
env.report_peak_memory_use()
@@ -39,7 +48,9 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
def measure_recovery_time(env: NeonCompare):
client = env.env.pageserver.http_client()
# Hmm why is pageserver less ready to respond to http when the datadir is large?
from urllib3.util.retry import Retry
client = env.env.pageserver.http_client(retries=Retry(1000))
pg_version = PgVersion(client.timeline_detail(env.tenant, env.timeline)["pg_version"])
# Stop pageserver and remove tenant data
@@ -57,3 +68,13 @@ def measure_recovery_time(env: NeonCompare):
# Flush, which will also wait for lsn to catch up
env.flush()
# This test is meant for local iteration only. The use case is when you want to re-run
# the measure_recovery_time part of test_bulk_insert, but without running the setup.
# It allows you to iterate on results 2x faster while trying to improve wal ingestion
# performance.
@pytest.mark.skip("this is a convenience test for local dev only")
def test_recovery(neon_env_builder):
env = neon_env_builder.init_start()
measure_recovery_time(env)

View File

@@ -757,14 +757,12 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
create_thread.join()
def test_compaction_waits_for_upload(
# Regression test for a race condition where L0 layers are compacted before the upload,
# resulting in the uploading complaining about the file not being found
# https://github.com/neondatabase/neon/issues/4526
def test_compaction_delete_before_upload(
neon_env_builder: NeonEnvBuilder,
):
"""
Compaction waits for outstanding uploads to complete, so that it avoids deleting layers
files that have not yet been uploaded. This test forces a race between upload and
compaction.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
@@ -794,82 +792,50 @@ def test_compaction_waits_for_upload(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# Now make the flushing hang and update one small piece of data
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
client.configure_failpoints(("flush-frozen-pausable", "pause"))
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_barrier = threading.Barrier(2)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
def checkpoint_in_background():
barrier.wait()
try:
log.info("Checkpoint starting")
client.timeline_checkpoint(tenant_id, timeline_id)
log.info("Checkpoint complete")
checkpoint_result.put(None)
q.put(None)
except PageserverApiException as e:
log.info("Checkpoint errored: {e}")
checkpoint_result.put(e)
q.put(e)
def compact_in_background():
compact_barrier.wait()
try:
log.info("Compaction starting")
client.timeline_compact(tenant_id, timeline_id)
log.info("Compaction complete")
compact_result.put(None)
except PageserverApiException as e:
log.info("Compaction errored: {e}")
compact_result.put(e)
checkpoint_thread = threading.Thread(target=checkpoint_in_background)
checkpoint_thread.start()
compact_thread = threading.Thread(target=compact_in_background)
compact_thread.start()
create_thread = threading.Thread(target=checkpoint_in_background)
create_thread.start()
try:
# Start the checkpoint, see that it blocks
log.info("Waiting to see checkpoint hang...")
time.sleep(5)
assert checkpoint_result.empty()
barrier.wait()
# Start the compaction, see that it finds work to do but blocks
compact_barrier.wait()
log.info("Waiting to see compaction hang...")
time.sleep(5)
assert compact_barrier.n_waiting == 0
assert compact_result.empty()
time.sleep(4)
client.timeline_compact(tenant_id, timeline_id)
# This is logged once compaction is started, but before we wait for operations to complete
assert env.pageserver.log_contains("compact_level0_phase1 stats available.")
client.configure_failpoints(("flush-frozen-pausable", "off"))
# Once we unblock uploads the compaction should complete successfully
log.info("Disabling failpoint")
client.configure_failpoints(("before-upload-layer-pausable", "off"))
log.info("Awaiting compaction result")
assert compact_result.get(timeout=10) is None
log.info("Awaiting checkpoint result")
assert checkpoint_result.get(timeout=10) is None
conflict = q.get()
except Exception:
# Log the actual failure's backtrace here, before we proceed to join threads
log.exception("Failure, cleaning up...")
raise
assert conflict is None
finally:
compact_barrier.abort()
create_thread.join()
checkpoint_thread.join()
compact_thread.join()
# Add a delay for the uploads to run into either the file not found or the
time.sleep(4)
# Ensure that this actually terminates
wait_upload_queue_empty(client, tenant_id, timeline_id)
# We should not have hit the error handling path in uploads where the remote file is gone
assert not env.pageserver.log_contains(
# For now we are hitting this message.
# Maybe in the future the underlying race condition will be fixed,
# but until then, ensure that this message is hit instead.
assert env.pageserver.log_contains(
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
)