mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
Wait for completion of the upload queue in flush_frozen_layer (#8550)
Makes `flush_frozen_layer` add a barrier to the upload queue and makes it wait for that barrier to be reached until it lets the flushing be completed. This gives us backpressure and ensures that writes can't build up in an unbounded fashion. Fixes #7317
This commit is contained in:
@@ -4,6 +4,11 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
# Enables test specific features.
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-compression.workspace = true
|
||||
|
||||
@@ -400,7 +400,15 @@ impl ComputeNode {
|
||||
pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let mut retry_period_ms = 500.0;
|
||||
let mut attempts = 0;
|
||||
let max_attempts = 10;
|
||||
const DEFAULT_ATTEMPTS: u16 = 10;
|
||||
#[cfg(feature = "testing")]
|
||||
let max_attempts = if let Ok(v) = env::var("NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES") {
|
||||
u16::from_str(&v).unwrap()
|
||||
} else {
|
||||
DEFAULT_ATTEMPTS
|
||||
};
|
||||
#[cfg(not(feature = "testing"))]
|
||||
let max_attempts = DEFAULT_ATTEMPTS;
|
||||
loop {
|
||||
let result = self.try_get_basebackup(compute_state, lsn);
|
||||
match result {
|
||||
|
||||
@@ -289,7 +289,7 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
|
||||
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
|
||||
for (var, val) in std::env::vars() {
|
||||
if var.starts_with("NEON_PAGESERVER_") {
|
||||
if var.starts_with("NEON_") {
|
||||
cmd = cmd.env(var, val);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,7 +143,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
use super::{config::TenantConf, upload_queue::NotInitialized};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
||||
use super::{
|
||||
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
|
||||
storage_layer::ReadableLayer,
|
||||
};
|
||||
use super::{
|
||||
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
|
||||
GcError,
|
||||
@@ -4089,6 +4092,21 @@ impl Timeline {
|
||||
// release lock on 'layers'
|
||||
};
|
||||
|
||||
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
|
||||
// This makes us refuse ingest until the new layers have been persisted to the remote.
|
||||
self.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
WaitCompletionError::UploadQueueShutDownOrStopped
|
||||
| WaitCompletionError::NotInitialized(
|
||||
NotInitialized::ShuttingDown | NotInitialized::Stopped,
|
||||
) => FlushLayerError::Cancelled,
|
||||
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
|
||||
FlushLayerError::Other(anyhow!(e).into())
|
||||
}
|
||||
})?;
|
||||
|
||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||
// a compaction can delete the file and then it won't be available for uploads any more.
|
||||
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
|
||||
|
||||
@@ -1943,11 +1943,15 @@ class NeonCli(AbstractNeonCli):
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple=False,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
"start",
|
||||
]
|
||||
extra_env_vars = {}
|
||||
if basebackup_request_tries is not None:
|
||||
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
|
||||
@@ -1960,7 +1964,7 @@ class NeonCli(AbstractNeonCli):
|
||||
if allow_multiple:
|
||||
args.extend(["--allow-multiple"])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res = self.raw_cli(args, extra_env_vars)
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
@@ -3812,6 +3816,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
allow_multiple: bool = False,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Start the Postgres instance.
|
||||
@@ -3833,6 +3838,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
self._running.release(1)
|
||||
|
||||
@@ -3979,6 +3985,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple=False,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
@@ -3999,6 +4006,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
@@ -4042,6 +4050,7 @@ class EndpointFactory:
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
@@ -4060,6 +4069,7 @@ class EndpointFactory:
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
|
||||
def create(
|
||||
|
||||
@@ -663,6 +663,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
force_image_layer_creation=False,
|
||||
wait_until_uploaded=False,
|
||||
compact: Optional[bool] = None,
|
||||
**kwargs,
|
||||
):
|
||||
self.is_testing_enabled_or_skip()
|
||||
query = {}
|
||||
@@ -680,6 +681,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
|
||||
params=query,
|
||||
**kwargs,
|
||||
)
|
||||
log.info(f"Got checkpoint request response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -18,7 +18,6 @@ from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
from requests.exceptions import RetryError
|
||||
|
||||
|
||||
# Test branch creation
|
||||
@@ -151,7 +150,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading",
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* ERROR: Not found: Timeline",
|
||||
]
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -176,10 +175,12 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
|
||||
|
||||
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
|
||||
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
|
||||
with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"):
|
||||
env.endpoints.create_start(
|
||||
initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2
|
||||
)
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
@@ -193,8 +194,11 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: .*Cannot branch off the timeline that's not present in pageserver.*",
|
||||
]
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -216,7 +220,10 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
with pytest.raises(RetryError, match="too many 503 error responses"):
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Cannot branch off the timeline that's not present in pageserver",
|
||||
):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
|
||||
@@ -12,7 +12,6 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
@@ -313,6 +312,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
def churn_while_failpoints_active(result):
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
|
||||
# this call will wait for the failpoints to be turned off
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
|
||||
@@ -332,8 +332,8 @@ def test_remote_storage_upload_queue_retries(
|
||||
# Exponential back-off in upload queue, so, gracious timeouts.
|
||||
|
||||
wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0))
|
||||
wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2))
|
||||
wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0))
|
||||
wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 1))
|
||||
wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0))
|
||||
|
||||
# unblock churn operations
|
||||
configure_storage_sync_failpoints("off")
|
||||
@@ -769,11 +769,11 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
|
||||
create_thread.join()
|
||||
|
||||
|
||||
def test_compaction_waits_for_upload(
|
||||
def test_paused_upload_stalls_checkpoint(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
This test forces a race between upload and compaction.
|
||||
This test checks that checkpoints block on uploads to remote storage.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
@@ -788,6 +788,10 @@ def test_compaction_waits_for_upload(
|
||||
}
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -808,76 +812,9 @@ def test_compaction_waits_for_upload(
|
||||
endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers())
|
||||
assert (
|
||||
deltas_at_first == 2
|
||||
), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement."
|
||||
|
||||
endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)")
|
||||
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
|
||||
upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name()
|
||||
|
||||
assert len(upload_stuck_layers) > 0
|
||||
|
||||
for name in upload_stuck_layers:
|
||||
assert env.pageserver.layer_exists(
|
||||
tenant_id, timeline_id, parse_layer_file_name(name)
|
||||
), "while uploads are stuck the layers should be present on disk"
|
||||
|
||||
# now this will do the L0 => L1 compaction and want to remove
|
||||
# upload_stuck_layers and the original initdb L0
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# as uploads are paused, the upload_stuck_layers should still be with us
|
||||
for name in upload_stuck_layers:
|
||||
assert env.pageserver.layer_exists(
|
||||
tenant_id, timeline_id, parse_layer_file_name(name)
|
||||
), "uploads are stuck still over compaction"
|
||||
|
||||
compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
|
||||
overlap = compacted_layers.intersection(upload_stuck_layers)
|
||||
assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction"
|
||||
assert (
|
||||
len(compacted_layers) == 1
|
||||
), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)"
|
||||
|
||||
def layer_deletes_completed():
|
||||
m = client.get_metric_value("pageserver_layer_completed_deletes_total")
|
||||
if m is None:
|
||||
return 0
|
||||
return int(m)
|
||||
|
||||
# if initdb created an initial delta layer, it might already be gc'd
|
||||
# because it was uploaded before the failpoint was enabled. however, the
|
||||
# deletion is not guaranteed to be complete.
|
||||
assert layer_deletes_completed() <= 1
|
||||
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "off"))
|
||||
|
||||
# Ensure that this actually terminates
|
||||
wait_upload_queue_empty(client, tenant_id, timeline_id)
|
||||
|
||||
def until_layer_deletes_completed():
|
||||
deletes = layer_deletes_completed()
|
||||
log.info(f"layer_deletes: {deletes}")
|
||||
# ensure that initdb delta layer AND the previously stuck are now deleted
|
||||
assert deletes >= len(upload_stuck_layers) + 1
|
||||
|
||||
wait_until(10, 1, until_layer_deletes_completed)
|
||||
|
||||
for name in upload_stuck_layers:
|
||||
assert not env.pageserver.layer_exists(
|
||||
tenant_id, timeline_id, parse_layer_file_name(name)
|
||||
), "l0 should now be removed because of L0 => L1 compaction and completed uploads"
|
||||
|
||||
# We should not have hit the error handling path in uploads where a uploaded file is gone
|
||||
assert not env.pageserver.log_contains(
|
||||
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
|
||||
)
|
||||
with pytest.raises(ReadTimeout):
|
||||
client.timeline_checkpoint(tenant_id, timeline_id, timeout=5)
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "off"))
|
||||
|
||||
|
||||
def wait_upload_queue_empty(
|
||||
|
||||
Reference in New Issue
Block a user