diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index d0611113e8..d34dd39f61 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -217,6 +217,9 @@ pub struct NeonStorageControllerConf { pub posthog_config: Option, pub kick_secondary_downloads: Option, + + #[serde(with = "humantime_serde")] + pub shard_split_request_timeout: Option, } impl NeonStorageControllerConf { @@ -250,6 +253,7 @@ impl Default for NeonStorageControllerConf { timeline_safekeeper_count: None, posthog_config: None, kick_secondary_downloads: None, + shard_split_request_timeout: None, } } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index dc6c82f504..f996f39967 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -648,6 +648,13 @@ impl StorageController { args.push(format!("--timeline-safekeeper-count={sk_cnt}")); } + if let Some(duration) = self.config.shard_split_request_timeout { + args.push(format!( + "--shard-split-request-timeout={}", + humantime::Duration::from(duration) + )); + } + let mut envs = vec![ ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 30d116f57c..e895380192 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -31,6 +31,7 @@ pub struct UnreliableWrapper { /* BEGIN_HADRON */ // This the probability of failure for each operation, ranged from [0, 100]. // The probability is default to 100, which means that all operations will fail. + // Storage will fail by probability up to attempts_to_fail times. attempt_failure_probability: u64, /* END_HADRON */ } diff --git a/libs/utils/src/env.rs b/libs/utils/src/env.rs index cc1cbf8009..0b3b5e6c4f 100644 --- a/libs/utils/src/env.rs +++ b/libs/utils/src/env.rs @@ -47,6 +47,7 @@ where /* BEGIN_HADRON */ pub enum DeploymentMode { + Local, Dev, Staging, Prod, @@ -64,7 +65,7 @@ pub fn get_deployment_mode() -> Option { } }, Err(_) => { - tracing::error!("DEPLOYMENT_MODE not set"); + // tracing::error!("DEPLOYMENT_MODE not set"); None } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 299fe7e159..dfb8b437c3 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -917,11 +917,6 @@ async fn create_remote_storage_client( // If `test_remote_failures` is non-zero, wrap the client with a // wrapper that simulates failures. if conf.test_remote_failures > 0 { - if !cfg!(feature = "testing") { - anyhow::bail!( - "test_remote_failures option is not available because pageserver was compiled without the 'testing' feature" - ); - } info!( "Simulating remote failures for first {} attempts of each op", conf.test_remote_failures diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d839bac557..0d40c5ecf7 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -4183,7 +4183,7 @@ pub fn make_router( }) .get( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage", - |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler), + |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler), ) .get( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage", diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 15853d3614..52f67abde5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1678,6 +1678,8 @@ impl TenantManager { // Phase 6: Release the InProgress on the parent shard drop(parent_slot_guard); + utils::pausable_failpoint!("shard-split-post-finish-pause"); + Ok(child_shards) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 718ea925b7..fe622713e9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5604,10 +5604,11 @@ impl Timeline { /// Predicate function which indicates whether we should check if new image layers /// are required. Since checking if new image layers are required is expensive in /// terms of CPU, we only do it in the following cases: - /// 1. If the timeline has ingested sufficient WAL to justify the cost + /// 1. If the timeline has ingested sufficient WAL to justify the cost or ... /// 2. If enough time has passed since the last check: /// 1. For large tenants, we wish to perform the check more often since they - /// suffer from the lack of image layers + /// suffer from the lack of image layers. Note that we assume sharded tenants + /// to be large since non-zero shards do not track the logical size. /// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval fn should_check_if_image_layers_required(self: &Arc, lsn: Lsn) -> bool { let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold; @@ -5621,30 +5622,39 @@ impl Timeline { let distance_based_decision = distance.0 >= min_distance; - let mut time_based_decision = false; let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap(); - if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() { - let check_required_after = - if Some(Into::::into(&logical_size)) >= large_timeline_threshold { - self.get_checkpoint_timeout() - } else { - Duration::from_secs(3600 * 48) - }; - - time_based_decision = match *last_check_instant { - Some(last_check) => { - let elapsed = last_check.elapsed(); - elapsed >= check_required_after + let check_required_after = (|| { + if self.shard_identity.is_unsharded() { + if let CurrentLogicalSize::Exact(logical_size) = + self.current_logical_size.current_size() + { + if Some(Into::::into(&logical_size)) < large_timeline_threshold { + return Duration::from_secs(3600 * 48); + } } - None => true, - }; - } + } + + self.get_checkpoint_timeout() + })(); + + let time_based_decision = match *last_check_instant { + Some(last_check) => { + let elapsed = last_check.elapsed(); + elapsed >= check_required_after + } + None => true, + }; // Do the expensive delta layer counting only if this timeline has ingested sufficient // WAL since the last check or a checkpoint timeout interval has elapsed since the last // check. let decision = distance_based_decision || time_based_decision; - + tracing::info!( + "Decided to check image layers: {}. Distance-based decision: {}, time-based decision: {}", + decision, + distance_based_decision, + time_based_decision + ); if decision { self.last_image_layer_creation_check_at.store(lsn); *last_check_instant = Some(Instant::now()); diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 2a851dc25b..5d21feeb10 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -222,6 +222,9 @@ struct Cli { /// Primarily useful for testing to reduce test execution time. #[arg(long, default_value = "false", action=ArgAction::Set)] kick_secondary_downloads: bool, + + #[arg(long)] + shard_split_request_timeout: Option, } enum StrictMode { @@ -470,6 +473,10 @@ async fn async_main() -> anyhow::Result<()> { timeline_safekeeper_count: args.timeline_safekeeper_count, posthog_config: posthog_config.clone(), kick_secondary_downloads: args.kick_secondary_downloads, + shard_split_request_timeout: args + .shard_split_request_timeout + .map(humantime::Duration::into) + .unwrap_or(Duration::MAX), }; // Validate that we can connect to the database diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0907907edc..638cb410fa 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -60,6 +60,7 @@ use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use utils::completion::Barrier; +use utils::env; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -483,6 +484,9 @@ pub struct Config { /// When set, actively checks and initiates heatmap downloads/uploads. pub kick_secondary_downloads: bool, + + /// Timeout used for HTTP client of split requests. [`Duration::MAX`] if None. + pub shard_split_request_timeout: Duration, } impl From for ApiError { @@ -6406,18 +6410,39 @@ impl Service { // TODO: issue split calls concurrently (this only matters once we're splitting // N>1 shards into M shards -- initially we're usually splitting 1 shard into N). + // HADRON: set a timeout for splitting individual shards on page servers. + // Currently we do not perform any retry because it's not clear if page server can handle + // partially split shards correctly. + let shard_split_timeout = + if let Some(env::DeploymentMode::Local) = env::get_deployment_mode() { + Duration::from_secs(30) + } else { + self.config.shard_split_request_timeout + }; + let mut http_client_builder = reqwest::ClientBuilder::new() + .pool_max_idle_per_host(0) + .timeout(shard_split_timeout); + + for ssl_ca_cert in &self.config.ssl_ca_certs { + http_client_builder = http_client_builder.add_root_certificate(ssl_ca_cert.clone()); + } + let http_client = http_client_builder + .build() + .expect("Failed to construct HTTP client"); for target in &targets { let ShardSplitTarget { parent_id, node, child_ids, } = target; + let client = PageserverClient::new( node.get_id(), - self.http_client.clone(), + http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), ); + let response = client .tenant_shard_split( *parent_id, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index ab02314288..963a19d640 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -7,6 +7,7 @@ import time from enum import StrEnum import pytest +from fixtures.common_types import TenantShardId from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, @@ -960,6 +961,67 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): return image_layer_count, delta_layer_count +def test_image_layer_creation_time_threshold(neon_env_builder: NeonEnvBuilder): + """ + Tests that image layers can be created when the time threshold is reached on non-0 shards. + """ + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + # disable distance based image layer creation check + "checkpoint_distance": 10 * 1024 * 1024 * 1024, + "checkpoint_timeout": "100ms", + "image_layer_force_creation_period": "1s", + "pitr_interval": "10s", + "gc_period": "1s", + "compaction_period": "1s", + "lsn_lease_length": "1s", + } + + # consider every tenant large to run the image layer generation check more eagerly + neon_env_builder.pageserver_config_override = ( + "image_layer_generation_large_timeline_threshold=0" + ) + + neon_env_builder.num_pageservers = 1 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start( + initial_tenant_conf=tenant_conf, + initial_tenant_shard_count=2, + initial_tenant_shard_stripe_size=1, + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)") + + for v in range(10): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + tenant_shard_id = TenantShardId(tenant_id, 1, 2) + + # Generate some rows. + for v in range(20): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + # restart page server so that logical size on non-0 shards is missing + env.pageserver.restart() + + (old_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0) + log.info(f"old images: {old_images}, old deltas: {old_deltas}") + + def check_image_creation(): + (new_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0) + log.info(f"images: {new_images}, deltas: {old_deltas}") + assert new_images > old_images + + wait_until(check_image_creation) + + endpoint.stop_and_destroy() + + def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder): """ Tests that page server can force creating new images if image_layer_force_creation_period is enabled diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 8ff767eca4..5549105188 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1673,6 +1673,91 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder): # END_HADRON +# HADRON +@pytest.mark.skip(reason="The backpressure change has not been merged yet.") +def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): + """ + Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level. + """ + init_shard_count = 4 + neon_env_builder.num_pageservers = init_shard_count + stripe_size = 1 + + env = neon_env_builder.init_start( + initial_tenant_shard_count=init_shard_count, + initial_tenant_shard_stripe_size=stripe_size, + initial_tenant_conf={ + # disable auto-flush of shards and set max_replication_flush_lag as 15MB. + # The backpressure parameters must be enforced at the shard level to avoid stalling PG. + "checkpoint_distance": 1 * 1024 * 1024 * 1024, + "checkpoint_timeout": "1h", + }, + ) + + endpoint = env.endpoints.create( + "main", + config_lines=[ + "max_replication_write_lag = 0", + "max_replication_apply_lag = 0", + "max_replication_flush_lag = 15MB", + "neon.max_cluster_size = 10GB", + ], + ) + endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created. + endpoint.start() + + # generate 20MB of data + endpoint.safe_psql( + "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;" + ) + res = endpoint.safe_psql( + "SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system" + )[0] + assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}" + + endpoint.stop() + + +# HADRON +def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder): + """ + Tests that shard split can correctly handle page server timeouts and abort the split + """ + init_shard_count = 2 + neon_env_builder.num_pageservers = 1 + stripe_size = 1 + + if neon_env_builder.storage_controller_config is None: + neon_env_builder.storage_controller_config = {"shard_split_request_timeout": "5s"} + else: + neon_env_builder.storage_controller_config["shard_split_request_timeout"] = "5s" + + env = neon_env_builder.init_start( + initial_tenant_shard_count=init_shard_count, + initial_tenant_shard_stripe_size=stripe_size, + ) + + env.storage_controller.allowed_errors.extend( + [ + ".*Enqueuing background abort.*", + ".*failpoint.*", + ".*Failed to abort.*", + ".*Exclusive lock by ShardSplit was held.*", + ] + ) + env.pageserver.allowed_errors.extend([".*request was dropped before completing.*"]) + + endpoint1 = env.endpoints.create_start(branch_name="main") + + env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "pause")) + + with pytest.raises(StorageControllerApiException): + env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4) + + env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "off")) + endpoint1.stop_and_destroy() + + def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder): """ Check a scenario when one of the shards is much slower than others.