feat(compute_ctl): add periodic lease lsn requests for static computes (#7994)

Part of #7497

## Problem

Static computes pinned at some fix LSN could be created initially within
PITR interval but eventually go out it. To make sure that Static
computes are not affected by GC, we need to start using the LSN lease
API (introduced in #8084) in compute_ctl.

## Summary of changes

**compute_ctl**
- Spawn a thread for when a static compute starts to periodically ping
pageserver(s) to make LSN lease requests.
- Add `test_readonly_node_gc` to test if static compute can read all
pages without error.
  - (test will fail on main without the code change here)

**page_service**
- `wait_or_get_last_lsn` will now allow `request_lsn` less than
`latest_gc_cutoff_lsn` to proceed if there is a lease on `request_lsn`.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
This commit is contained in:
Andrew Rudenko
2024-08-28 21:09:26 +02:00
committed by GitHub
parent 9627747d35
commit acc075071d
5 changed files with 331 additions and 18 deletions

View File

@@ -44,6 +44,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
@@ -366,6 +367,8 @@ fn wait_spec(
state.start_time = now;
}
launch_lsn_lease_bg_task_for_static(&compute);
Ok(WaitSpecResult {
compute,
http_port,

View File

@@ -11,6 +11,7 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
pub mod lsn_lease;
mod migration;
pub mod monitor;
pub mod params;

View File

@@ -0,0 +1,186 @@
use anyhow::bail;
use anyhow::Result;
use postgres::{NoTls, SimpleQueryMessage};
use std::time::SystemTime;
use std::{str::FromStr, sync::Arc, thread, time::Duration};
use utils::id::TenantId;
use utils::id::TimelineId;
use compute_api::spec::ComputeMode;
use tracing::{info, warn};
use utils::{
lsn::Lsn,
shard::{ShardCount, ShardNumber, TenantShardId},
};
use crate::compute::ComputeNode;
/// Spawns a background thread to periodically renew LSN leases for static compute.
/// Do nothing if the compute is not in static mode.
pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
let (tenant_id, timeline_id, lsn) = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("Spec must be set");
match spec.spec.mode {
ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
_ => return,
}
};
let compute = compute.clone();
let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
thread::spawn(move || {
let _entered = span.entered();
if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
// TODO: might need stronger error feedback than logging an warning.
warn!("Exited with error: {e}");
}
});
}
/// Renews lsn lease periodically so static compute are not affected by GC.
fn lsn_lease_bg_task(
compute: Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<()> {
loop {
let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
let valid_duration = valid_until
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
// Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
let sleep_duration = valid_duration
.saturating_sub(Duration::from_secs(60))
.max(valid_duration / 2);
info!(
"Succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
thread::sleep(sleep_duration);
}
}
/// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
fn acquire_lsn_lease_with_retry(
compute: &Arc<ComputeNode>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<SystemTime> {
let mut attempts = 0usize;
let mut retry_period_ms: f64 = 500.0;
const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
loop {
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
let configs = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
let conn_strings = spec.pageserver_connstr.split(',');
conn_strings
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token.clone());
} else {
info!("Storage auth token not set");
}
config
})
.collect::<Vec<_>>()
};
let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs);
match result {
Ok(Some(res)) => {
return Ok(res);
}
Ok(None) => {
bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
}
Err(e) => {
warn!("Failed to acquire lsn lease: {e} (attempt {attempts}");
thread::sleep(Duration::from_millis(retry_period_ms as u64));
retry_period_ms *= 1.5;
retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
}
}
attempts += 1;
}
}
/// Tries to acquire an LSN lease through PS page_service API.
fn try_acquire_lsn_lease(
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
configs: &[postgres::Config],
) -> Result<Option<SystemTime>> {
fn get_valid_until(
config: &postgres::Config,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let mut client = config.connect(NoTls)?;
let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
let res = client.simple_query(&cmd)?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => bail!("error parsing lsn lease response"),
};
// Note: this will be None if a lease is explicitly not granted.
let valid_until_str = row.get("valid_until");
let valid_until = valid_until_str.map(|s| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
.expect("Time larger than max SystemTime could handle")
});
Ok(valid_until)
}
let shard_count = configs.len();
let valid_until = if shard_count > 1 {
configs
.iter()
.enumerate()
.map(|(shard_number, config)| {
let tenant_shard_id = TenantShardId {
tenant_id,
shard_count: ShardCount::new(shard_count as u8),
shard_number: ShardNumber(shard_number as u8),
};
get_valid_until(config, tenant_shard_id, timeline_id, lsn)
})
.collect::<Result<Vec<Option<SystemTime>>>>()?
.into_iter()
.min()
.unwrap()
} else {
get_valid_until(
&configs[0],
TenantShardId::unsharded(tenant_id),
timeline_id,
lsn,
)?
};
Ok(valid_until)
}

View File

@@ -753,16 +753,21 @@ impl PageServerHandler {
}
if request_lsn < **latest_gc_cutoff_lsn {
// Check explicitly for INVALID just to get a less scary error message if the
// request is obviously bogus
return Err(if request_lsn == Lsn::INVALID {
PageStreamError::BadRequest("invalid LSN(0) in request".into())
} else {
PageStreamError::BadRequest(format!(
let gc_info = &timeline.gc_info.read().unwrap();
if !gc_info.leases.contains_key(&request_lsn) {
// The requested LSN is below gc cutoff and is not guarded by a lease.
// Check explicitly for INVALID just to get a less scary error message if the
// request is obviously bogus
return Err(if request_lsn == Lsn::INVALID {
PageStreamError::BadRequest("invalid LSN(0) in request".into())
} else {
PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
request_lsn, **latest_gc_cutoff_lsn
).into())
});
});
}
}
// Wait for WAL up to 'not_modified_since' to arrive, if necessary
@@ -789,6 +794,8 @@ impl PageServerHandler {
}
}
/// Handles the lsn lease request.
/// If a lease cannot be obtained, the client will receive NULL.
#[instrument(skip_all, fields(shard_id, %lsn))]
async fn handle_make_lsn_lease<IO>(
&mut self,
@@ -811,19 +818,25 @@ impl PageServerHandler {
.await?;
set_tracing_field_shard_id(&timeline);
let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
let valid_until = lease
.valid_until
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| QueryError::Other(e.into()))?;
let lease = timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
.inspect_err(|e| {
warn!("{e}");
})
.ok();
let valid_until_str = lease.map(|l| {
l.valid_until
.duration_since(SystemTime::UNIX_EPOCH)
.expect("valid_until is earlier than UNIX_EPOCH")
.as_millis()
.to_string()
});
let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
b"valid_until",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[Some(
&valid_until.as_millis().to_be_bytes(),
)]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
.write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
Ok(())
}

View File

@@ -1,7 +1,15 @@
import time
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
last_flush_lsn_upload,
tenant_get_shards,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.utils import query_scalar
@@ -17,7 +25,12 @@ def test_readonly_node(neon_simple_env: NeonEnv):
env.neon_cli.create_branch("test_readonly_node", "empty")
endpoint_main = env.endpoints.create_start("test_readonly_node")
env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*")
env.pageserver.allowed_errors.extend(
[
".*basebackup .* failed: invalid basebackup lsn.*",
".*page_service.*handle_make_lsn_lease.*.*tried to request a page version that was garbage collected",
]
)
main_pg_conn = endpoint_main.connect()
main_cur = main_pg_conn.cursor()
@@ -105,6 +118,103 @@ def test_readonly_node(neon_simple_env: NeonEnv):
)
def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
"""
Test static endpoint is protected from GC by acquiring and renewing lsn leases.
"""
neon_env_builder.num_pageservers = 2
# GC is manual triggered.
env = neon_env_builder.init_start(
initial_tenant_conf={
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
# Short lease length to fit test.
"lsn_lease_length": "3s",
},
initial_tenant_shard_count=2,
)
ROW_COUNT = 500
def generate_updates_on_main(
env: NeonEnv,
ep_main: Endpoint,
data: int,
start=1,
end=ROW_COUNT,
) -> Lsn:
"""
Generates some load on main branch that results in some uploads.
"""
with ep_main.cursor() as cur:
cur.execute(
f"INSERT INTO t0 (v0, v1) SELECT g, '{data}' FROM generate_series({start}, {end}) g ON CONFLICT (v0) DO UPDATE SET v1 = EXCLUDED.v1"
)
cur.execute("VACUUM t0")
last_flush_lsn = last_flush_lsn_upload(
env, ep_main, env.initial_tenant, env.initial_timeline
)
return last_flush_lsn
# Insert some records on main branch
with env.endpoints.create_start("main") as ep_main:
with ep_main.cursor() as cur:
cur.execute("CREATE TABLE t0(v0 int primary key, v1 text)")
lsn = None
for i in range(2):
lsn = generate_updates_on_main(env, ep_main, i)
with env.endpoints.create_start(
branch_name="main",
endpoint_id="static",
lsn=lsn,
) as ep_static:
with ep_static.cursor() as cur:
cur.execute("SELECT count(*) FROM t0")
assert cur.fetchone() == (ROW_COUNT,)
time.sleep(3)
generate_updates_on_main(env, ep_main, i, end=100)
# Trigger GC
for shard, ps in tenant_get_shards(env, env.initial_tenant):
client = ps.http_client()
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
log.info(f"{gc_result=}")
assert (
gc_result["layers_removed"] == 0
), "No layers should be removed, old layers are guarded by leases."
with ep_static.cursor() as cur:
cur.execute("SELECT count(*) FROM t0")
assert cur.fetchone() == (ROW_COUNT,)
# Do some update so we can increment latest_gc_cutoff
generate_updates_on_main(env, ep_main, i, end=100)
# Now trigger GC again, layers should be removed.
time.sleep(4)
for shard, ps in tenant_get_shards(env, env.initial_tenant):
client = ps.http_client()
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
log.info(f"{gc_result=}")
assert gc_result["layers_removed"] > 0, "Old layers should be removed after leases expired."
# Similar test, but with more data, and we force checkpoints
def test_timetravel(neon_simple_env: NeonEnv):
env = neon_simple_env