From acc075071dbb5f365f809fcf5372216e17adae6f Mon Sep 17 00:00:00 2001 From: Andrew Rudenko Date: Wed, 28 Aug 2024 21:09:26 +0200 Subject: [PATCH] feat(compute_ctl): add periodic `lease lsn` requests for static computes (#7994) Part of #7497 ## Problem Static computes pinned at some fix LSN could be created initially within PITR interval but eventually go out it. To make sure that Static computes are not affected by GC, we need to start using the LSN lease API (introduced in #8084) in compute_ctl. ## Summary of changes **compute_ctl** - Spawn a thread for when a static compute starts to periodically ping pageserver(s) to make LSN lease requests. - Add `test_readonly_node_gc` to test if static compute can read all pages without error. - (test will fail on main without the code change here) **page_service** - `wait_or_get_last_lsn` will now allow `request_lsn` less than `latest_gc_cutoff_lsn` to proceed if there is a lease on `request_lsn`. Signed-off-by: Yuchen Liang Co-authored-by: Alexey Kondratov --- compute_tools/src/bin/compute_ctl.rs | 3 + compute_tools/src/lib.rs | 1 + compute_tools/src/lsn_lease.rs | 186 ++++++++++++++++++++++ pageserver/src/page_service.rs | 45 ++++-- test_runner/regress/test_readonly_node.py | 114 ++++++++++++- 5 files changed, 331 insertions(+), 18 deletions(-) create mode 100644 compute_tools/src/lsn_lease.rs diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 0ba2c1aeb4..9499a7186e 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -44,6 +44,7 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; +use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; use tracing::{error, info, warn}; @@ -366,6 +367,8 @@ fn wait_spec( state.start_time = now; } + launch_lsn_lease_bg_task_for_static(&compute); + Ok(WaitSpecResult { compute, http_port, diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 543d4462ed..c402d63305 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -11,6 +11,7 @@ pub mod logger; pub mod catalog; pub mod compute; pub mod extension_server; +pub mod lsn_lease; mod migration; pub mod monitor; pub mod params; diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs new file mode 100644 index 0000000000..7e5917c55f --- /dev/null +++ b/compute_tools/src/lsn_lease.rs @@ -0,0 +1,186 @@ +use anyhow::bail; +use anyhow::Result; +use postgres::{NoTls, SimpleQueryMessage}; +use std::time::SystemTime; +use std::{str::FromStr, sync::Arc, thread, time::Duration}; +use utils::id::TenantId; +use utils::id::TimelineId; + +use compute_api::spec::ComputeMode; +use tracing::{info, warn}; +use utils::{ + lsn::Lsn, + shard::{ShardCount, ShardNumber, TenantShardId}, +}; + +use crate::compute::ComputeNode; + +/// Spawns a background thread to periodically renew LSN leases for static compute. +/// Do nothing if the compute is not in static mode. +pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { + let (tenant_id, timeline_id, lsn) = { + let state = compute.state.lock().unwrap(); + let spec = state.pspec.as_ref().expect("Spec must be set"); + match spec.spec.mode { + ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn), + _ => return, + } + }; + let compute = compute.clone(); + + let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn); + thread::spawn(move || { + let _entered = span.entered(); + if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) { + // TODO: might need stronger error feedback than logging an warning. + warn!("Exited with error: {e}"); + } + }); +} + +/// Renews lsn lease periodically so static compute are not affected by GC. +fn lsn_lease_bg_task( + compute: Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, +) -> Result<()> { + loop { + let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?; + let valid_duration = valid_until + .duration_since(SystemTime::now()) + .unwrap_or(Duration::ZERO); + + // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration. + let sleep_duration = valid_duration + .saturating_sub(Duration::from_secs(60)) + .max(valid_duration / 2); + + info!( + "Succeeded, sleeping for {} seconds", + sleep_duration.as_secs() + ); + thread::sleep(sleep_duration); + } +} + +/// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted. +/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests. +fn acquire_lsn_lease_with_retry( + compute: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, +) -> Result { + let mut attempts = 0usize; + let mut retry_period_ms: f64 = 500.0; + const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0; + + loop { + // Note: List of pageservers is dynamic, need to re-read configs before each attempt. + let configs = { + let state = compute.state.lock().unwrap(); + + let spec = state.pspec.as_ref().expect("spec must be set"); + + let conn_strings = spec.pageserver_connstr.split(','); + + conn_strings + .map(|connstr| { + let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr"); + if let Some(storage_auth_token) = &spec.storage_auth_token { + info!("Got storage auth token from spec file"); + config.password(storage_auth_token.clone()); + } else { + info!("Storage auth token not set"); + } + config + }) + .collect::>() + }; + + let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs); + match result { + Ok(Some(res)) => { + return Ok(res); + } + Ok(None) => { + bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff"); + } + Err(e) => { + warn!("Failed to acquire lsn lease: {e} (attempt {attempts}"); + + thread::sleep(Duration::from_millis(retry_period_ms as u64)); + retry_period_ms *= 1.5; + retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS); + } + } + attempts += 1; + } +} + +/// Tries to acquire an LSN lease through PS page_service API. +fn try_acquire_lsn_lease( + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Lsn, + configs: &[postgres::Config], +) -> Result> { + fn get_valid_until( + config: &postgres::Config, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lsn: Lsn, + ) -> Result> { + let mut client = config.connect(NoTls)?; + let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn); + let res = client.simple_query(&cmd)?; + let msg = match res.first() { + Some(msg) => msg, + None => bail!("empty response"), + }; + let row = match msg { + SimpleQueryMessage::Row(row) => row, + _ => bail!("error parsing lsn lease response"), + }; + + // Note: this will be None if a lease is explicitly not granted. + let valid_until_str = row.get("valid_until"); + + let valid_until = valid_until_str.map(|s| { + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64)) + .expect("Time larger than max SystemTime could handle") + }); + Ok(valid_until) + } + + let shard_count = configs.len(); + + let valid_until = if shard_count > 1 { + configs + .iter() + .enumerate() + .map(|(shard_number, config)| { + let tenant_shard_id = TenantShardId { + tenant_id, + shard_count: ShardCount::new(shard_count as u8), + shard_number: ShardNumber(shard_number as u8), + }; + get_valid_until(config, tenant_shard_id, timeline_id, lsn) + }) + .collect::>>>()? + .into_iter() + .min() + .unwrap() + } else { + get_valid_until( + &configs[0], + TenantShardId::unsharded(tenant_id), + timeline_id, + lsn, + )? + }; + + Ok(valid_until) +} diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index cb1ab70147..39c6a6fb74 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -753,16 +753,21 @@ impl PageServerHandler { } if request_lsn < **latest_gc_cutoff_lsn { - // Check explicitly for INVALID just to get a less scary error message if the - // request is obviously bogus - return Err(if request_lsn == Lsn::INVALID { - PageStreamError::BadRequest("invalid LSN(0) in request".into()) - } else { - PageStreamError::BadRequest(format!( + let gc_info = &timeline.gc_info.read().unwrap(); + if !gc_info.leases.contains_key(&request_lsn) { + // The requested LSN is below gc cutoff and is not guarded by a lease. + + // Check explicitly for INVALID just to get a less scary error message if the + // request is obviously bogus + return Err(if request_lsn == Lsn::INVALID { + PageStreamError::BadRequest("invalid LSN(0) in request".into()) + } else { + PageStreamError::BadRequest(format!( "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", request_lsn, **latest_gc_cutoff_lsn ).into()) - }); + }); + } } // Wait for WAL up to 'not_modified_since' to arrive, if necessary @@ -789,6 +794,8 @@ impl PageServerHandler { } } + /// Handles the lsn lease request. + /// If a lease cannot be obtained, the client will receive NULL. #[instrument(skip_all, fields(shard_id, %lsn))] async fn handle_make_lsn_lease( &mut self, @@ -811,19 +818,25 @@ impl PageServerHandler { .await?; set_tracing_field_shard_id(&timeline); - let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?; - let valid_until = lease - .valid_until - .duration_since(SystemTime::UNIX_EPOCH) - .map_err(|e| QueryError::Other(e.into()))?; + let lease = timeline + .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx) + .inspect_err(|e| { + warn!("{e}"); + }) + .ok(); + let valid_until_str = lease.map(|l| { + l.valid_until + .duration_since(SystemTime::UNIX_EPOCH) + .expect("valid_until is earlier than UNIX_EPOCH") + .as_millis() + .to_string() + }); + let bytes = valid_until_str.as_ref().map(|x| x.as_bytes()); pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( b"valid_until", )]))? - .write_message_noflush(&BeMessage::DataRow(&[Some( - &valid_until.as_millis().to_be_bytes(), - )]))? - .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + .write_message_noflush(&BeMessage::DataRow(&[bytes]))?; Ok(()) } diff --git a/test_runner/regress/test_readonly_node.py b/test_runner/regress/test_readonly_node.py index ba8b91e84d..368f60127e 100644 --- a/test_runner/regress/test_readonly_node.py +++ b/test_runner/regress/test_readonly_node.py @@ -1,7 +1,15 @@ +import time + import pytest from fixtures.common_types import Lsn from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv +from fixtures.neon_fixtures import ( + Endpoint, + NeonEnv, + NeonEnvBuilder, + last_flush_lsn_upload, + tenant_get_shards, +) from fixtures.pageserver.utils import wait_for_last_record_lsn from fixtures.utils import query_scalar @@ -17,7 +25,12 @@ def test_readonly_node(neon_simple_env: NeonEnv): env.neon_cli.create_branch("test_readonly_node", "empty") endpoint_main = env.endpoints.create_start("test_readonly_node") - env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*") + env.pageserver.allowed_errors.extend( + [ + ".*basebackup .* failed: invalid basebackup lsn.*", + ".*page_service.*handle_make_lsn_lease.*.*tried to request a page version that was garbage collected", + ] + ) main_pg_conn = endpoint_main.connect() main_cur = main_pg_conn.cursor() @@ -105,6 +118,103 @@ def test_readonly_node(neon_simple_env: NeonEnv): ) +def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder): + """ + Test static endpoint is protected from GC by acquiring and renewing lsn leases. + """ + + neon_env_builder.num_pageservers = 2 + # GC is manual triggered. + env = neon_env_builder.init_start( + initial_tenant_conf={ + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{128 * 1024}", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers eagerly, so that GC can remove some layers + "image_creation_threshold": "1", + "image_layer_creation_check_threshold": "0", + # Short lease length to fit test. + "lsn_lease_length": "3s", + }, + initial_tenant_shard_count=2, + ) + + ROW_COUNT = 500 + + def generate_updates_on_main( + env: NeonEnv, + ep_main: Endpoint, + data: int, + start=1, + end=ROW_COUNT, + ) -> Lsn: + """ + Generates some load on main branch that results in some uploads. + """ + with ep_main.cursor() as cur: + cur.execute( + f"INSERT INTO t0 (v0, v1) SELECT g, '{data}' FROM generate_series({start}, {end}) g ON CONFLICT (v0) DO UPDATE SET v1 = EXCLUDED.v1" + ) + cur.execute("VACUUM t0") + last_flush_lsn = last_flush_lsn_upload( + env, ep_main, env.initial_tenant, env.initial_timeline + ) + return last_flush_lsn + + # Insert some records on main branch + with env.endpoints.create_start("main") as ep_main: + with ep_main.cursor() as cur: + cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)") + lsn = None + for i in range(2): + lsn = generate_updates_on_main(env, ep_main, i) + + with env.endpoints.create_start( + branch_name="main", + endpoint_id="static", + lsn=lsn, + ) as ep_static: + with ep_static.cursor() as cur: + cur.execute("SELECT count(*) FROM t0") + assert cur.fetchone() == (ROW_COUNT,) + + time.sleep(3) + + generate_updates_on_main(env, ep_main, i, end=100) + + # Trigger GC + for shard, ps in tenant_get_shards(env, env.initial_tenant): + client = ps.http_client() + gc_result = client.timeline_gc(shard, env.initial_timeline, 0) + log.info(f"{gc_result=}") + + assert ( + gc_result["layers_removed"] == 0 + ), "No layers should be removed, old layers are guarded by leases." + + with ep_static.cursor() as cur: + cur.execute("SELECT count(*) FROM t0") + assert cur.fetchone() == (ROW_COUNT,) + + # Do some update so we can increment latest_gc_cutoff + generate_updates_on_main(env, ep_main, i, end=100) + + # Now trigger GC again, layers should be removed. + time.sleep(4) + for shard, ps in tenant_get_shards(env, env.initial_tenant): + client = ps.http_client() + gc_result = client.timeline_gc(shard, env.initial_timeline, 0) + log.info(f"{gc_result=}") + + assert gc_result["layers_removed"] > 0, "Old layers should be removed after leases expired." + + # Similar test, but with more data, and we force checkpoints def test_timetravel(neon_simple_env: NeonEnv): env = neon_simple_env