diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 0ba2c1aeb4..9499a7186e 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -44,6 +44,7 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; +use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; use tracing::{error, info, warn}; @@ -366,6 +367,8 @@ fn wait_spec( state.start_time = now; } + launch_lsn_lease_bg_task_for_static(&compute); + Ok(WaitSpecResult { compute, http_port, diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 543d4462ed..c402d63305 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -11,6 +11,7 @@ pub mod logger; pub mod catalog; pub mod compute; pub mod extension_server; +pub mod lsn_lease; mod migration; pub mod monitor; pub mod params; diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs new file mode 100644 index 0000000000..7e5917c55f --- /dev/null +++ b/compute_tools/src/lsn_lease.rs @@ -0,0 +1,186 @@ +use anyhow::bail; +use anyhow::Result; +use postgres::{NoTls, SimpleQueryMessage}; +use std::time::SystemTime; +use std::{str::FromStr, sync::Arc, thread, time::Duration}; +use utils::id::TenantId; +use utils::id::TimelineId; + +use compute_api::spec::ComputeMode; +use tracing::{info, warn}; +use utils::{ + lsn::Lsn, + shard::{ShardCount, ShardNumber, TenantShardId}, +}; + +use crate::compute::ComputeNode; + +/// Spawns a background thread to periodically renew LSN leases for static compute. +/// Do nothing if the compute is not in static mode. +pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { + let (tenant_id, timeline_id, lsn) = { + let state = compute.state.lock().unwrap(); + let spec = state.pspec.as_ref().expect("Spec must be set"); + match spec.spec.mode { + ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn), + _ => return, + } + }; + let compute = compute.clone(); + + let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn); + thread::spawn(move || { + let _entered = span.entered(); + if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) { + // TODO: might need stronger error feedback than logging an warning. + warn!("Exited with error: {e}"); + } + }); +} + +/// Renews lsn lease periodically so static compute are not affected by GC. +fn lsn_lease_bg_task( + compute: Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, +) -> Result<()> { + loop { + let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?; + let valid_duration = valid_until + .duration_since(SystemTime::now()) + .unwrap_or(Duration::ZERO); + + // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration. + let sleep_duration = valid_duration + .saturating_sub(Duration::from_secs(60)) + .max(valid_duration / 2); + + info!( + "Succeeded, sleeping for {} seconds", + sleep_duration.as_secs() + ); + thread::sleep(sleep_duration); + } +} + +/// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted. +/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests. +fn acquire_lsn_lease_with_retry( + compute: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, +) -> Result { + let mut attempts = 0usize; + let mut retry_period_ms: f64 = 500.0; + const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0; + + loop { + // Note: List of pageservers is dynamic, need to re-read configs before each attempt. + let configs = { + let state = compute.state.lock().unwrap(); + + let spec = state.pspec.as_ref().expect("spec must be set"); + + let conn_strings = spec.pageserver_connstr.split(','); + + conn_strings + .map(|connstr| { + let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr"); + if let Some(storage_auth_token) = &spec.storage_auth_token { + info!("Got storage auth token from spec file"); + config.password(storage_auth_token.clone()); + } else { + info!("Storage auth token not set"); + } + config + }) + .collect::>() + }; + + let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs); + match result { + Ok(Some(res)) => { + return Ok(res); + } + Ok(None) => { + bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff"); + } + Err(e) => { + warn!("Failed to acquire lsn lease: {e} (attempt {attempts}"); + + thread::sleep(Duration::from_millis(retry_period_ms as u64)); + retry_period_ms *= 1.5; + retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS); + } + } + attempts += 1; + } +} + +/// Tries to acquire an LSN lease through PS page_service API. +fn try_acquire_lsn_lease( + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, + configs: &[postgres::Config], +) -> Result> { + fn get_valid_until( + config: &postgres::Config, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lsn: Lsn, + ) -> Result> { + let mut client = config.connect(NoTls)?; + let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn); + let res = client.simple_query(&cmd)?; + let msg = match res.first() { + Some(msg) => msg, + None => bail!("empty response"), + }; + let row = match msg { + SimpleQueryMessage::Row(row) => row, + _ => bail!("error parsing lsn lease response"), + }; + + // Note: this will be None if a lease is explicitly not granted. + let valid_until_str = row.get("valid_until"); + + let valid_until = valid_until_str.map(|s| { + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64)) + .expect("Time larger than max SystemTime could handle") + }); + Ok(valid_until) + } + + let shard_count = configs.len(); + + let valid_until = if shard_count > 1 { + configs + .iter() + .enumerate() + .map(|(shard_number, config)| { + let tenant_shard_id = TenantShardId { + tenant_id, + shard_count: ShardCount::new(shard_count as u8), + shard_number: ShardNumber(shard_number as u8), + }; + get_valid_until(config, tenant_shard_id, timeline_id, lsn) + }) + .collect::>>>()? + .into_iter() + .min() + .unwrap() + } else { + get_valid_until( + &configs[0], + TenantShardId::unsharded(tenant_id), + timeline_id, + lsn, + )? + }; + + Ok(valid_until) +} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index cb1ab70147..39c6a6fb74 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -753,16 +753,21 @@ impl PageServerHandler { } if request_lsn < **latest_gc_cutoff_lsn { - // Check explicitly for INVALID just to get a less scary error message if the - // request is obviously bogus - return Err(if request_lsn == Lsn::INVALID { - PageStreamError::BadRequest("invalid LSN(0) in request".into()) - } else { - PageStreamError::BadRequest(format!( + let gc_info = &timeline.gc_info.read().unwrap(); + if !gc_info.leases.contains_key(&request_lsn) { + // The requested LSN is below gc cutoff and is not guarded by a lease. + + // Check explicitly for INVALID just to get a less scary error message if the + // request is obviously bogus + return Err(if request_lsn == Lsn::INVALID { + PageStreamError::BadRequest("invalid LSN(0) in request".into()) + } else { + PageStreamError::BadRequest(format!( "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", request_lsn, **latest_gc_cutoff_lsn ).into()) - }); + }); + } } // Wait for WAL up to 'not_modified_since' to arrive, if necessary @@ -789,6 +794,8 @@ impl PageServerHandler { } } + /// Handles the lsn lease request. + /// If a lease cannot be obtained, the client will receive NULL. #[instrument(skip_all, fields(shard_id, %lsn))] async fn handle_make_lsn_lease( &mut self, @@ -811,19 +818,25 @@ impl PageServerHandler { .await?; set_tracing_field_shard_id(&timeline); - let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?; - let valid_until = lease - .valid_until - .duration_since(SystemTime::UNIX_EPOCH) - .map_err(|e| QueryError::Other(e.into()))?; + let lease = timeline + .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx) + .inspect_err(|e| { + warn!("{e}"); + }) + .ok(); + let valid_until_str = lease.map(|l| { + l.valid_until + .duration_since(SystemTime::UNIX_EPOCH) + .expect("valid_until is earlier than UNIX_EPOCH") + .as_millis() + .to_string() + }); + let bytes = valid_until_str.as_ref().map(|x| x.as_bytes()); pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( b"valid_until", )]))? - .write_message_noflush(&BeMessage::DataRow(&[Some( - &valid_until.as_millis().to_be_bytes(), - )]))? - .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + .write_message_noflush(&BeMessage::DataRow(&[bytes]))?; Ok(()) } diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index ba8b91e84d..368f60127e 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -1,7 +1,15 @@ +import time + import pytest from fixtures.common_types import Lsn from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv +from fixtures.neon_fixtures import ( + Endpoint, + NeonEnv, + NeonEnvBuilder, + last_flush_lsn_upload, + tenant_get_shards, +) from fixtures.pageserver.utils import wait_for_last_record_lsn from fixtures.utils import query_scalar @@ -17,7 +25,12 @@ def test_readonly_node(neon_simple_env: NeonEnv): env.neon_cli.create_branch("test_readonly_node", "empty") endpoint_main = env.endpoints.create_start("test_readonly_node") - env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*") + env.pageserver.allowed_errors.extend( + [ + ".*basebackup .* failed: invalid basebackup lsn.*", + ".*page_service.*handle_make_lsn_lease.*.*tried to request a page version that was garbage collected", + ] + ) main_pg_conn = endpoint_main.connect() main_cur = main_pg_conn.cursor() @@ -105,6 +118,103 @@ def test_readonly_node(neon_simple_env: NeonEnv): ) +def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): + """ + Test static endpoint is protected from GC by acquiring and renewing lsn leases. + """ + + neon_env_builder.num_pageservers = 2 + # GC is manual triggered. + env = neon_env_builder.init_start( + initial_tenant_conf={ + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{128 * 1024}", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers eagerly, so that GC can remove some layers + "image_creation_threshold": "1", + "image_layer_creation_check_threshold": "0", + # Short lease length to fit test. + "lsn_lease_length": "3s", + }, + initial_tenant_shard_count=2, + ) + + ROW_COUNT = 500 + + def generate_updates_on_main( + env: NeonEnv, + ep_main: Endpoint, + data: int, + start=1, + end=ROW_COUNT, + ) -> Lsn: + """ + Generates some load on main branch that results in some uploads. + """ + with ep_main.cursor() as cur: + cur.execute( + f"INSERT INTO t0 (v0, v1) SELECT g, '{data}' FROM generate_series({start}, {end}) g ON CONFLICT (v0) DO UPDATE SET v1 = EXCLUDED.v1" + ) + cur.execute("VACUUM t0") + last_flush_lsn = last_flush_lsn_upload( + env, ep_main, env.initial_tenant, env.initial_timeline + ) + return last_flush_lsn + + # Insert some records on main branch + with env.endpoints.create_start("main") as ep_main: + with ep_main.cursor() as cur: + cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)") + lsn = None + for i in range(2): + lsn = generate_updates_on_main(env, ep_main, i) + + with env.endpoints.create_start( + branch_name="main", + endpoint_id="static", + lsn=lsn, + ) as ep_static: + with ep_static.cursor() as cur: + cur.execute("SELECT count(*) FROM t0") + assert cur.fetchone() == (ROW_COUNT,) + + time.sleep(3) + + generate_updates_on_main(env, ep_main, i, end=100) + + # Trigger GC + for shard, ps in tenant_get_shards(env, env.initial_tenant): + client = ps.http_client() + gc_result = client.timeline_gc(shard, env.initial_timeline, 0) + log.info(f"{gc_result=}") + + assert ( + gc_result["layers_removed"] == 0 + ), "No layers should be removed, old layers are guarded by leases." + + with ep_static.cursor() as cur: + cur.execute("SELECT count(*) FROM t0") + assert cur.fetchone() == (ROW_COUNT,) + + # Do some update so we can increment latest_gc_cutoff + generate_updates_on_main(env, ep_main, i, end=100) + + # Now trigger GC again, layers should be removed. + time.sleep(4) + for shard, ps in tenant_get_shards(env, env.initial_tenant): + client = ps.http_client() + gc_result = client.timeline_gc(shard, env.initial_timeline, 0) + log.info(f"{gc_result=}") + + assert gc_result["layers_removed"] > 0, "Old layers should be removed after leases expired." + + # Similar test, but with more data, and we force checkpoints def test_timetravel(neon_simple_env: NeonEnv): env = neon_simple_env