Compare commits

...

9 Commits

Author SHA1 Message Date
Yuchen Liang
402cb2c40a add allowed errors for leases in two tests that uses static endpoints
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-20 17:28:29 -04:00
Yuchen Liang
fba409a158 avoid holding locks when a thread is sleeping
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-20 16:01:41 -04:00
Yuchen Liang
8baaccb56c Merge branch 'main' into prepor/lsn_leasing 2024-06-20 07:58:54 -04:00
Andrey Rudenko
06f953ad21 feedback: reread connstrings 2024-06-19 18:16:59 +02:00
Andrey Rudenko
66d0277571 feedback 2024-06-17 15:40:21 +02:00
Andrey Rudenko
3b2159310f fixes to protocol based on https://github.com/neondatabase/neon/pull/8039 2024-06-13 10:52:41 +02:00
Andrey Rudenko
ff55d6f4df feedback: using valid_until (it doesn't work currently) 2024-06-12 11:45:26 +02:00
Andrey Rudenko
3d1bcc9524 linter fix 2024-06-10 14:45:16 +02:00
Andrey Rudenko
ef90a6295e feat(compute_ctl): add periodic lease lsn requests for static computes 2024-06-10 14:45:16 +02:00
6 changed files with 135 additions and 10 deletions

View File

@@ -45,6 +45,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use compute_tools::lsn_lease::launch_lsn_lease_loop_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
@@ -363,6 +364,8 @@ fn wait_spec(
state.start_time = now;
}
launch_lsn_lease_loop_for_static(&compute);
Ok(WaitSpecResult {
compute,
http_port,

View File

@@ -11,6 +11,7 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
pub mod lsn_lease;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -0,0 +1,113 @@
use anyhow::bail;
use anyhow::Result;
use postgres::{NoTls, SimpleQueryMessage};
use std::{
str::FromStr,
sync::Arc,
thread,
time::{Duration, SystemTime},
};
use compute_api::spec::ComputeMode;
use tracing::{error, info};
use utils::lsn::Lsn;
use crate::compute::{ComputeNode, ComputeState};
pub fn launch_lsn_lease_loop_for_static(compute: &Arc<ComputeNode>) {
let lsn = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
match spec.spec.mode {
ComputeMode::Static(lsn) => lsn,
_ => return,
}
};
let compute = compute.clone();
thread::spawn(move || lsn_lease_loop(compute, lsn));
}
fn postgres_configs_from_state(compute_state: &ComputeState) -> Vec<postgres::Config> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let conn_strings = spec.pageserver_connstr.split(',');
conn_strings
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token.clone());
} else {
info!("Storage auth token not set");
}
config
})
.collect::<Vec<_>>()
}
fn lsn_lease_loop(compute: Arc<ComputeNode>, lsn: Lsn) {
loop {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
let configs = postgres_configs_from_state(&state);
let cmd = format!("lease lsn {} {} {} ", spec.tenant_id, spec.timeline_id, lsn);
drop(state);
match lsn_lease_request(&configs, &cmd) {
Ok(valid_until) => {
let valid_until_duration = Duration::from_millis(valid_until as u64);
let sleep_duration = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.checked_sub(valid_until_duration)
.unwrap_or(Duration::ZERO)
.checked_sub(Duration::from_secs(60))
.unwrap_or(Duration::ZERO);
// Ensure the sleep duration is at least 60 seconds to avoid busy loops
let sleep_duration = std::cmp::max(sleep_duration, Duration::from_secs(60));
info!(
"lsn_lease_request succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
thread::sleep(sleep_duration);
}
Err(e) => {
error!("lsn_lease_request failed: {:#}", e);
thread::sleep(Duration::from_secs(10));
}
}
}
}
fn lsn_lease_request(configs: &[postgres::Config], cmd: &str) -> Result<u128> {
info!("lsn_lease_request: {}", cmd);
let valid_until = configs
.iter()
.map(|config| {
let mut client = config.connect(NoTls)?;
let res = client.simple_query(cmd)?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => bail!("error parsing lsn lease response"),
};
let valid_until_str = match row.get("valid_until") {
Some(valid_until) => valid_until,
None => bail!("valid_until not found"),
};
Ok(u128::from_str(valid_until_str)?)
})
.collect::<Result<Vec<u128>>>()?
.into_iter()
.min()
.unwrap();
Ok(valid_until)
}

View File

@@ -945,9 +945,8 @@ impl PageServerHandler {
b"valid_until",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[Some(
&valid_until.as_millis().to_be_bytes(),
)]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
valid_until.as_millis().to_string().as_bytes(),
)]))?;
Ok(())
}

View File

@@ -88,6 +88,9 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
for ps in env.pageservers:
ps.allowed_errors.append(".*page_service.*error obtaining lsn lease.*Tenant .* not found")
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()

View File

@@ -6,18 +6,24 @@ from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.utils import query_scalar
#
# Create read-only compute nodes, anchored at historical points in time.
#
# This is very similar to the 'test_branch_behind' test, but instead of
# creating branches, creates read-only nodes.
#
def test_readonly_node(neon_simple_env: NeonEnv):
"""
Create read-only compute nodes, anchored at historical points in time.
This is very similar to the 'test_branch_behind' test, but instead of
creating branches, creates read-only nodes.
"""
env = neon_simple_env
env.neon_cli.create_branch("test_readonly_node", "empty")
endpoint_main = env.endpoints.create_start("test_readonly_node")
env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*")
env.pageserver.allowed_errors.extend(
[
".*basebackup .* failed: invalid basebackup lsn.*",
".*page_service.*error obtaining lsn lease.*.*tried to request a page version that was garbage collected",
]
)
main_pg_conn = endpoint_main.connect()
main_cur = main_pg_conn.cursor()