diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 8ceb8f2ad2..8af0ed43ce 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -4,6 +4,11 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +default = [] +# Enables test specific features. +testing = [] + [dependencies] anyhow.workspace = true async-compression.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 91855d954d..5bd6897fe3 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -400,7 +400,15 @@ impl ComputeNode { pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { let mut retry_period_ms = 500.0; let mut attempts = 0; - let max_attempts = 10; + const DEFAULT_ATTEMPTS: u16 = 10; + #[cfg(feature = "testing")] + let max_attempts = if let Ok(v) = env::var("NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES") { + u16::from_str(&v).unwrap() + } else { + DEFAULT_ATTEMPTS + }; + #[cfg(not(feature = "testing"))] + let max_attempts = DEFAULT_ATTEMPTS; loop { let result = self.try_get_basebackup(compute_state, lsn); match result { diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index a272c306e7..bf8a27e550 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -289,7 +289,7 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command { fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command { for (var, val) in std::env::vars() { - if var.starts_with("NEON_PAGESERVER_") { + if var.starts_with("NEON_") { cmd = cmd.env(var, val); } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 37ebeded66..be72e15c19 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -143,7 +143,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::{config::TenantConf, upload_queue::NotInitialized}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; -use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer}; +use super::{ + remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError, + storage_layer::ReadableLayer, +}; use super::{ secondary::heatmap::{HeatMapLayer, HeatMapTimeline}, GcError, @@ -4089,6 +4092,21 @@ impl Timeline { // release lock on 'layers' }; + // Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files. + // This makes us refuse ingest until the new layers have been persisted to the remote. + self.remote_client + .wait_completion() + .await + .map_err(|e| match e { + WaitCompletionError::UploadQueueShutDownOrStopped + | WaitCompletionError::NotInitialized( + NotInitialized::ShuttingDown | NotInitialized::Stopped, + ) => FlushLayerError::Cancelled, + WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => { + FlushLayerError::Other(anyhow!(e).into()) + } + })?; + // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, // a compaction can delete the file and then it won't be available for uploads any more. // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b370a92e38..7289472de2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1943,11 +1943,15 @@ class NeonCli(AbstractNeonCli): remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, allow_multiple=False, + basebackup_request_tries: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", "start", ] + extra_env_vars = {} + if basebackup_request_tries is not None: + extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries) if remote_ext_config is not None: args.extend(["--remote-ext-config", remote_ext_config]) @@ -1960,7 +1964,7 @@ class NeonCli(AbstractNeonCli): if allow_multiple: args.extend(["--allow-multiple"]) - res = self.raw_cli(args) + res = self.raw_cli(args, extra_env_vars) res.check_returncode() return res @@ -3812,6 +3816,7 @@ class Endpoint(PgProtocol, LogUtils): pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None, allow_multiple: bool = False, + basebackup_request_tries: Optional[int] = None, ) -> "Endpoint": """ Start the Postgres instance. @@ -3833,6 +3838,7 @@ class Endpoint(PgProtocol, LogUtils): remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, allow_multiple=allow_multiple, + basebackup_request_tries=basebackup_request_tries, ) self._running.release(1) @@ -3979,6 +3985,7 @@ class Endpoint(PgProtocol, LogUtils): remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, allow_multiple=False, + basebackup_request_tries: Optional[int] = None, ) -> "Endpoint": """ Create an endpoint, apply config, and start Postgres. @@ -3999,6 +4006,7 @@ class Endpoint(PgProtocol, LogUtils): remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, allow_multiple=allow_multiple, + basebackup_request_tries=basebackup_request_tries, ) log.info(f"Postgres startup took {time.time() - started_at} seconds") @@ -4042,6 +4050,7 @@ class EndpointFactory: config_lines: Optional[List[str]] = None, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, + basebackup_request_tries: Optional[int] = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -4060,6 +4069,7 @@ class EndpointFactory: lsn=lsn, remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, + basebackup_request_tries=basebackup_request_tries, ) def create( diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index c6df6b5baf..192324f086 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -663,6 +663,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): force_image_layer_creation=False, wait_until_uploaded=False, compact: Optional[bool] = None, + **kwargs, ): self.is_testing_enabled_or_skip() query = {} @@ -680,6 +681,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", params=query, + **kwargs, ) log.info(f"Got checkpoint request response code: {res.status_code}") self.verbose_error(res) diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 190b624a54..fc74707639 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -18,7 +18,6 @@ from fixtures.pageserver.utils import wait_until_tenant_active from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix from requests import RequestException -from requests.exceptions import RetryError # Test branch creation @@ -151,7 +150,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE env.pageserver.allowed_errors.extend( [ ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*", - ".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading", + ".*page_service_conn_main.*: query handler for 'basebackup .* ERROR: Not found: Timeline", ] ) ps_http = env.pageserver.http_client() @@ -176,10 +175,12 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline) - with pytest.raises(RuntimeError, match="is not active, state: Loading"): - env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant) + with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"): + env.endpoints.create_start( + initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2 + ) + ps_http.configure_failpoints(("before-upload-index-pausable", "off")) finally: - # FIXME: paused uploads bother shutdown env.pageserver.stop(immediate=True) t.join() @@ -193,8 +194,11 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder env = neon_env_builder.init_configs() env.start() - env.pageserver.allowed_errors.append( - ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*" + env.pageserver.allowed_errors.extend( + [ + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*", + ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: .*Cannot branch off the timeline that's not present in pageserver.*", + ] ) ps_http = env.pageserver.http_client() @@ -216,7 +220,10 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder branch_id = TimelineId.generate() - with pytest.raises(RetryError, match="too many 503 error responses"): + with pytest.raises( + PageserverApiException, + match="Cannot branch off the timeline that's not present in pageserver", + ): ps_http.timeline_create( env.pg_version, env.initial_tenant, diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 09f941f582..2e5260ca78 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -12,7 +12,6 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, wait_for_last_flush_lsn, ) -from fixtures.pageserver.common_types import parse_layer_file_name from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( timeline_delete_wait_completed, @@ -313,6 +312,7 @@ def test_remote_storage_upload_queue_retries( def churn_while_failpoints_active(result): overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c") + # this call will wait for the failpoints to be turned off client.timeline_checkpoint(tenant_id, timeline_id) client.timeline_compact(tenant_id, timeline_id) overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d") @@ -332,8 +332,8 @@ def test_remote_storage_upload_queue_retries( # Exponential back-off in upload queue, so, gracious timeouts. wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0)) - wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2)) - wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0)) + wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 1)) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # unblock churn operations configure_storage_sync_failpoints("off") @@ -769,11 +769,11 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv create_thread.join() -def test_compaction_waits_for_upload( +def test_paused_upload_stalls_checkpoint( neon_env_builder: NeonEnvBuilder, ): """ - This test forces a race between upload and compaction. + This test checks that checkpoints block on uploads to remote storage. """ neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) @@ -788,6 +788,10 @@ def test_compaction_waits_for_upload( } ) + env.pageserver.allowed_errors.append( + f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing" + ) + tenant_id = env.initial_tenant timeline_id = env.initial_timeline @@ -808,76 +812,9 @@ def test_compaction_waits_for_upload( endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)") wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - client.timeline_checkpoint(tenant_id, timeline_id) - deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers()) - assert ( - deltas_at_first == 2 - ), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement." - - endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)") - endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - - layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name() - upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name() - - assert len(upload_stuck_layers) > 0 - - for name in upload_stuck_layers: - assert env.pageserver.layer_exists( - tenant_id, timeline_id, parse_layer_file_name(name) - ), "while uploads are stuck the layers should be present on disk" - - # now this will do the L0 => L1 compaction and want to remove - # upload_stuck_layers and the original initdb L0 - client.timeline_checkpoint(tenant_id, timeline_id) - - # as uploads are paused, the upload_stuck_layers should still be with us - for name in upload_stuck_layers: - assert env.pageserver.layer_exists( - tenant_id, timeline_id, parse_layer_file_name(name) - ), "uploads are stuck still over compaction" - - compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name() - overlap = compacted_layers.intersection(upload_stuck_layers) - assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction" - assert ( - len(compacted_layers) == 1 - ), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)" - - def layer_deletes_completed(): - m = client.get_metric_value("pageserver_layer_completed_deletes_total") - if m is None: - return 0 - return int(m) - - # if initdb created an initial delta layer, it might already be gc'd - # because it was uploaded before the failpoint was enabled. however, the - # deletion is not guaranteed to be complete. - assert layer_deletes_completed() <= 1 - - client.configure_failpoints(("before-upload-layer-pausable", "off")) - - # Ensure that this actually terminates - wait_upload_queue_empty(client, tenant_id, timeline_id) - - def until_layer_deletes_completed(): - deletes = layer_deletes_completed() - log.info(f"layer_deletes: {deletes}") - # ensure that initdb delta layer AND the previously stuck are now deleted - assert deletes >= len(upload_stuck_layers) + 1 - - wait_until(10, 1, until_layer_deletes_completed) - - for name in upload_stuck_layers: - assert not env.pageserver.layer_exists( - tenant_id, timeline_id, parse_layer_file_name(name) - ), "l0 should now be removed because of L0 => L1 compaction and completed uploads" - - # We should not have hit the error handling path in uploads where a uploaded file is gone - assert not env.pageserver.log_contains( - "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." - ) + with pytest.raises(ReadTimeout): + client.timeline_checkpoint(tenant_id, timeline_id, timeout=5) + client.configure_failpoints(("before-upload-layer-pausable", "off")) def wait_upload_queue_empty(