mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 18:40:38 +00:00
Compare commits
2 Commits
jcsp/issue
...
bojan-tmp-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a8fd6266aa | ||
|
|
151605d751 |
@@ -36,7 +36,7 @@ use utils::pid_file::{self, PidFileRead};
|
||||
// it's waiting. If the process hasn't started/stopped after 5 seconds,
|
||||
// it prints a notice that it's taking long, but keeps waiting.
|
||||
//
|
||||
const RETRY_UNTIL_SECS: u64 = 10;
|
||||
const RETRY_UNTIL_SECS: u64 = 10000;
|
||||
const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
|
||||
const RETRY_INTERVAL_MILLIS: u64 = 100;
|
||||
const DOT_EVERY_RETRIES: u64 = 10;
|
||||
|
||||
@@ -267,12 +267,6 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
let buf = download_and_compare(dl).await?;
|
||||
assert_eq!(buf, data);
|
||||
|
||||
debug!("Cleanup: deleting file at path {path:?}");
|
||||
ctx.client
|
||||
.delete(&path)
|
||||
.await
|
||||
.with_context(|| format!("{path:?} removal"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -691,9 +691,11 @@ impl RemoteTimelineClient {
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
Some((name, meta.generation))
|
||||
} else {
|
||||
// This should never happen: callers should ensure that any layer they
|
||||
// request deletion of has already been scheduled for upload
|
||||
error!(
|
||||
// This can only happen if we forgot to to schedule the file upload
|
||||
// before scheduling the delete. Log it because it is a rare/strange
|
||||
// situation, and in case something is misbehaving, we'd like to know which
|
||||
// layers experienced this.
|
||||
info!(
|
||||
"Deleting layer {name} not found in latest_files list, never uploaded?"
|
||||
);
|
||||
None
|
||||
@@ -1097,7 +1099,7 @@ impl RemoteTimelineClient {
|
||||
.timeline_path(&self.tenant_id, &self.timeline_id)
|
||||
.join(layer_file_name.file_name());
|
||||
|
||||
let result = upload::upload_timeline_layer(
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
&path,
|
||||
@@ -1111,27 +1113,7 @@ impl RemoteTimelineClient {
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Ok(outcome) = &result {
|
||||
match outcome {
|
||||
upload::UploadOutcome::NotFound => {
|
||||
// Layer uploads can be no-ops if the file is not found on local disk. In this case, we must
|
||||
// update our remote metadata to reflect that the file doesn't exist (it was added to `latest_files`)
|
||||
// when scheduled.
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
if let Ok(upload_queue) = guard.initialized_mut() {
|
||||
upload_queue.latest_files.remove(layer_file_name);
|
||||
upload_queue
|
||||
.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
}
|
||||
}
|
||||
upload::UploadOutcome::Uploaded => {
|
||||
// Success, no special handling required.
|
||||
}
|
||||
}
|
||||
}
|
||||
result.map(|_| ())
|
||||
.await
|
||||
}
|
||||
UploadOp::UploadMetadata(ref index_part, _lsn) => {
|
||||
let mention_having_future_layers = if cfg!(feature = "testing") {
|
||||
|
||||
@@ -45,11 +45,6 @@ pub(super) async fn upload_index_part<'a>(
|
||||
.with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
pub(super) enum UploadOutcome {
|
||||
Uploaded,
|
||||
NotFound,
|
||||
}
|
||||
|
||||
/// Attempts to upload given layer files.
|
||||
/// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
|
||||
///
|
||||
@@ -60,13 +55,11 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
source_path: &'a Utf8Path,
|
||||
known_metadata: &'a LayerFileMetadata,
|
||||
generation: Generation,
|
||||
) -> anyhow::Result<UploadOutcome> {
|
||||
) -> anyhow::Result<()> {
|
||||
fail_point!("before-upload-layer", |_| {
|
||||
bail!("failpoint before-upload-layer")
|
||||
});
|
||||
|
||||
pausable_failpoint!("before-upload-layer-pausable");
|
||||
|
||||
let storage_path = remote_path(conf, source_path, generation)?;
|
||||
let source_file_res = fs::File::open(&source_path).await;
|
||||
let source_file = match source_file_res {
|
||||
@@ -78,7 +71,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
// something worse, like when a file is scheduled for upload before
|
||||
// it has been written to disk yet.
|
||||
info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
|
||||
return Ok(UploadOutcome::NotFound);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
|
||||
@@ -104,5 +97,5 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
.await
|
||||
.with_context(|| format!("upload layer from local path '{source_path}'"))?;
|
||||
|
||||
Ok(UploadOutcome::Uploaded)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2793,13 +2793,10 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now. The flushed layer is stored in
|
||||
// the mapping in `create_delta_layer`.
|
||||
let metadata = {
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
if let Some(ref l) = delta_layer_to_add {
|
||||
@@ -2815,17 +2812,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
|
||||
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
||||
Some(self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
// release lock on 'layers'
|
||||
};
|
||||
}
|
||||
|
||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||
// a compaction can delete the file and then it won't be available for uploads any more.
|
||||
@@ -2841,22 +2829,28 @@ impl Timeline {
|
||||
//
|
||||
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
||||
// *all* the layers, to avoid fsyncing the file multiple times.
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
// If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
|
||||
if let Some(metadata) = metadata {
|
||||
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
// After crash, we will restart WAL streaming and processing from that point.
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
|
||||
.await
|
||||
.context("save_metadata")?;
|
||||
.context("update_metadata_file")?;
|
||||
// Also update the in-memory copy
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update metadata file
|
||||
fn schedule_uploads(
|
||||
async fn update_metadata_file(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
) -> anyhow::Result<TimelineMetadata> {
|
||||
) -> anyhow::Result<()> {
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
@@ -2893,6 +2887,10 @@ impl Timeline {
|
||||
x.unwrap()
|
||||
));
|
||||
|
||||
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
|
||||
.await
|
||||
.context("save_metadata")?;
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for (path, layer_metadata) in layer_paths_to_upload {
|
||||
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
|
||||
@@ -2900,20 +2898,6 @@ impl Timeline {
|
||||
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
|
||||
}
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
async fn update_metadata_file(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
let metadata = self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?;
|
||||
|
||||
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
|
||||
.await
|
||||
.context("save_metadata")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -4123,6 +4107,17 @@ impl Timeline {
|
||||
|
||||
debug!("retain_lsns: {:?}", retain_lsns);
|
||||
|
||||
// Before deleting any layers, we need to wait for their upload ops to finish.
|
||||
// See storage_sync module level comment on consistency.
|
||||
// Do it here because we don't want to hold self.layers.write() while waiting.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
debug!("waiting for upload ops to complete");
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
let mut wanted_image_layers = KeySpaceRandomAccum::default();
|
||||
|
||||
@@ -4233,18 +4228,6 @@ impl Timeline {
|
||||
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
|
||||
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Before deleting any layers, we need to wait for their upload ops to finish.
|
||||
// Presence of a layer in LayerManager implies it has been schedule for upload,
|
||||
// so by waiting for uploads _after_ walking the layers, we are guaranteed that
|
||||
// everything we are deleting was uploaded.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
debug!("waiting for upload ops to complete");
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
|
||||
|
||||
@@ -157,7 +157,7 @@ def wait_for_last_record_lsn(
|
||||
lsn: Lsn,
|
||||
) -> Lsn:
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
for i in range(100):
|
||||
for i in range(1000000):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
return current_lsn
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import pytest
|
||||
import os
|
||||
import shutil
|
||||
from contextlib import closing
|
||||
from fixtures.log_helper import log
|
||||
|
||||
from fixtures.compare_fixtures import NeonCompare, PgCompare
|
||||
from fixtures.pg_version import PgVersion
|
||||
@@ -18,6 +21,9 @@ from fixtures.pg_version import PgVersion
|
||||
def test_bulk_insert(neon_with_baseline: PgCompare):
|
||||
env = neon_with_baseline
|
||||
|
||||
# Number of times to run the write query. One run creates 350MB of wal.
|
||||
n_writes = 10
|
||||
|
||||
with closing(env.pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("create table huge (i int, j int);")
|
||||
@@ -25,7 +31,10 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
|
||||
# Run INSERT, recording the time and I/O it takes
|
||||
with env.record_pageserver_writes("pageserver_writes"):
|
||||
with env.record_duration("insert"):
|
||||
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
|
||||
for i in range(n_writes):
|
||||
if n_writes > 1:
|
||||
log.info(f"running query {i}/{n_writes}")
|
||||
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
|
||||
env.flush()
|
||||
|
||||
env.report_peak_memory_use()
|
||||
@@ -39,7 +48,9 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
|
||||
|
||||
|
||||
def measure_recovery_time(env: NeonCompare):
|
||||
client = env.env.pageserver.http_client()
|
||||
# Hmm why is pageserver less ready to respond to http when the datadir is large?
|
||||
from urllib3.util.retry import Retry
|
||||
client = env.env.pageserver.http_client(retries=Retry(1000))
|
||||
pg_version = PgVersion(client.timeline_detail(env.tenant, env.timeline)["pg_version"])
|
||||
|
||||
# Stop pageserver and remove tenant data
|
||||
@@ -57,3 +68,13 @@ def measure_recovery_time(env: NeonCompare):
|
||||
|
||||
# Flush, which will also wait for lsn to catch up
|
||||
env.flush()
|
||||
|
||||
|
||||
# This test is meant for local iteration only. The use case is when you want to re-run
|
||||
# the measure_recovery_time part of test_bulk_insert, but without running the setup.
|
||||
# It allows you to iterate on results 2x faster while trying to improve wal ingestion
|
||||
# performance.
|
||||
@pytest.mark.skip("this is a convenience test for local dev only")
|
||||
def test_recovery(neon_env_builder):
|
||||
env = neon_env_builder.init_start()
|
||||
measure_recovery_time(env)
|
||||
|
||||
@@ -757,14 +757,12 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
|
||||
create_thread.join()
|
||||
|
||||
|
||||
def test_compaction_waits_for_upload(
|
||||
# Regression test for a race condition where L0 layers are compacted before the upload,
|
||||
# resulting in the uploading complaining about the file not being found
|
||||
# https://github.com/neondatabase/neon/issues/4526
|
||||
def test_compaction_delete_before_upload(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Compaction waits for outstanding uploads to complete, so that it avoids deleting layers
|
||||
files that have not yet been uploaded. This test forces a race between upload and
|
||||
compaction.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
@@ -794,82 +792,50 @@ def test_compaction_waits_for_upload(
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# Now make the flushing hang and update one small piece of data
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
|
||||
client.configure_failpoints(("flush-frozen-pausable", "pause"))
|
||||
|
||||
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
compact_barrier = threading.Barrier(2)
|
||||
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
barrier = threading.Barrier(2)
|
||||
|
||||
def checkpoint_in_background():
|
||||
barrier.wait()
|
||||
try:
|
||||
log.info("Checkpoint starting")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
log.info("Checkpoint complete")
|
||||
checkpoint_result.put(None)
|
||||
q.put(None)
|
||||
except PageserverApiException as e:
|
||||
log.info("Checkpoint errored: {e}")
|
||||
checkpoint_result.put(e)
|
||||
q.put(e)
|
||||
|
||||
def compact_in_background():
|
||||
compact_barrier.wait()
|
||||
try:
|
||||
log.info("Compaction starting")
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
log.info("Compaction complete")
|
||||
compact_result.put(None)
|
||||
except PageserverApiException as e:
|
||||
log.info("Compaction errored: {e}")
|
||||
compact_result.put(e)
|
||||
|
||||
checkpoint_thread = threading.Thread(target=checkpoint_in_background)
|
||||
checkpoint_thread.start()
|
||||
|
||||
compact_thread = threading.Thread(target=compact_in_background)
|
||||
compact_thread.start()
|
||||
create_thread = threading.Thread(target=checkpoint_in_background)
|
||||
create_thread.start()
|
||||
|
||||
try:
|
||||
# Start the checkpoint, see that it blocks
|
||||
log.info("Waiting to see checkpoint hang...")
|
||||
time.sleep(5)
|
||||
assert checkpoint_result.empty()
|
||||
barrier.wait()
|
||||
|
||||
# Start the compaction, see that it finds work to do but blocks
|
||||
compact_barrier.wait()
|
||||
log.info("Waiting to see compaction hang...")
|
||||
time.sleep(5)
|
||||
assert compact_barrier.n_waiting == 0
|
||||
assert compact_result.empty()
|
||||
time.sleep(4)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
# This is logged once compaction is started, but before we wait for operations to complete
|
||||
assert env.pageserver.log_contains("compact_level0_phase1 stats available.")
|
||||
client.configure_failpoints(("flush-frozen-pausable", "off"))
|
||||
|
||||
# Once we unblock uploads the compaction should complete successfully
|
||||
log.info("Disabling failpoint")
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "off"))
|
||||
log.info("Awaiting compaction result")
|
||||
assert compact_result.get(timeout=10) is None
|
||||
log.info("Awaiting checkpoint result")
|
||||
assert checkpoint_result.get(timeout=10) is None
|
||||
conflict = q.get()
|
||||
|
||||
except Exception:
|
||||
# Log the actual failure's backtrace here, before we proceed to join threads
|
||||
log.exception("Failure, cleaning up...")
|
||||
raise
|
||||
assert conflict is None
|
||||
finally:
|
||||
compact_barrier.abort()
|
||||
create_thread.join()
|
||||
|
||||
checkpoint_thread.join()
|
||||
compact_thread.join()
|
||||
# Add a delay for the uploads to run into either the file not found or the
|
||||
time.sleep(4)
|
||||
|
||||
# Ensure that this actually terminates
|
||||
wait_upload_queue_empty(client, tenant_id, timeline_id)
|
||||
|
||||
# We should not have hit the error handling path in uploads where the remote file is gone
|
||||
assert not env.pageserver.log_contains(
|
||||
# For now we are hitting this message.
|
||||
# Maybe in the future the underlying race condition will be fixed,
|
||||
# but until then, ensure that this message is hit instead.
|
||||
assert env.pageserver.log_contains(
|
||||
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user