mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
Compare commits
13 Commits
cross_regi
...
startup-no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa9baddd3d | ||
|
|
4756dcd0cc | ||
|
|
ab23e28768 | ||
|
|
341563261a | ||
|
|
b836013721 | ||
|
|
44ad006eb3 | ||
|
|
6bac770811 | ||
|
|
aff94b54c8 | ||
|
|
1adb38bb82 | ||
|
|
1baecdc27a | ||
|
|
eceda63379 | ||
|
|
881bfc4da8 | ||
|
|
eda4f86588 |
@@ -442,8 +442,42 @@ impl ComputeNode {
|
||||
|
||||
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
|
||||
|
||||
// Maybe apply the spec
|
||||
if spec.spec.mode == ComputeMode::Primary {
|
||||
self.apply_config(&compute_state)?;
|
||||
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
|
||||
|
||||
// Get spec_id or make it up by hashing
|
||||
//
|
||||
// TODO Make spec_id required so there would be no need to hash.
|
||||
let spec_id = spec.operation_uuid.clone().unwrap_or_else(|| {
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
// HACK Exclude postgresql.conf because it doesn't need
|
||||
// to be applied like the other fields in the spec
|
||||
let mut spec_no_conf = spec.clone();
|
||||
spec_no_conf.cluster.postgresql_conf = None;
|
||||
|
||||
let json = serde_json::to_vec(&spec_no_conf).unwrap();
|
||||
let mut hasher = DefaultHasher::new();
|
||||
json.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
format!("{:x}", hash)
|
||||
});
|
||||
|
||||
// Get current spec_id
|
||||
let path = Path::new(&self.pgdata).join("neon_compute_spec_id.txt");
|
||||
let current_spec_id = std::fs::read_to_string(path).ok();
|
||||
|
||||
// Respec if needed
|
||||
if current_spec_id == Some(spec_id.clone()) {
|
||||
info!("no need to respec");
|
||||
} else {
|
||||
info!("respeccing {:?} {:?}", current_spec_id, &spec_id);
|
||||
|
||||
self.apply_config(&compute_state)?;
|
||||
self.cache_spec_id(&compute_state, spec_id)?;
|
||||
}
|
||||
}
|
||||
|
||||
let startup_end_time = Utc::now();
|
||||
@@ -465,6 +499,28 @@ impl ComputeNode {
|
||||
Ok(pg)
|
||||
}
|
||||
|
||||
fn cache_spec_id(&self, compute_state: &ComputeState, spec_id: String) -> anyhow::Result<()> {
|
||||
let spec = &compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let cmd = format!(
|
||||
"set_compute_spec_id {} {} {}",
|
||||
spec.tenant_id, spec.timeline_id, spec_id,
|
||||
);
|
||||
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
|
||||
|
||||
// Use the storage auth token from the config file, if given.
|
||||
// Note: this overrides any password set in the connection string.
|
||||
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
||||
info!("Got storage auth token from spec file");
|
||||
config.password(storage_auth_token);
|
||||
} else {
|
||||
info!("Storage auth token not set");
|
||||
}
|
||||
let mut client = config.connect(NoTls)?;
|
||||
client.simple_query(&cmd)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Look for core dumps and collect backtraces.
|
||||
//
|
||||
// EKS worker nodes have following core dump settings:
|
||||
|
||||
@@ -417,6 +417,16 @@ where
|
||||
// Also send zenith.signal file with extra bootstrap data.
|
||||
//
|
||||
async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
||||
// Add neon_compute_spec_id.txt
|
||||
if let Some(spec_id) = &self.timeline.compute_spec_id.lock().await.clone() {
|
||||
self.ar
|
||||
.append(
|
||||
&new_tar_header("neon_compute_spec_id.txt", spec_id.len() as u64)?,
|
||||
spec_id.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// add zenith.signal file
|
||||
let mut zenith_signal = String::new();
|
||||
if self.prev_record_lsn == Lsn(0) {
|
||||
|
||||
@@ -915,6 +915,27 @@ where
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("set_compute_spec_id ") {
|
||||
let (_, params_raw) = query_string.split_at("set_compute_spec_id ".len());
|
||||
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||
|
||||
if params.len() != 3 {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"invalid param number for set_compute_spec_id command"
|
||||
)));
|
||||
}
|
||||
|
||||
let tenant_id = TenantId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
let spec_id = params[2].to_string();
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
|
||||
*timeline.compute_spec_id.lock().await = Some(spec_id);
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
// return pair of prev_lsn and last_lsn
|
||||
else if query_string.starts_with("get_last_record_rlsn ") {
|
||||
|
||||
@@ -215,6 +215,10 @@ pub struct Timeline {
|
||||
// though let's keep them both for better error visibility.
|
||||
pub initdb_lsn: Lsn,
|
||||
|
||||
// Compute nodes can set this field after successful application
|
||||
// of a new spec, in order to avoid reapplying it on next restart.
|
||||
pub compute_spec_id: tokio::sync::Mutex<Option<String>>,
|
||||
|
||||
/// When did we last calculate the partitioning?
|
||||
partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
|
||||
@@ -1456,6 +1460,7 @@ impl Timeline {
|
||||
|
||||
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
compute_spec_id: tokio::sync::Mutex::new(None),
|
||||
|
||||
current_logical_size: if disk_consistent_lsn.is_valid() {
|
||||
// we're creating timeline data with some layer files existing locally,
|
||||
|
||||
@@ -2912,6 +2912,7 @@ SKIP_FILES = frozenset(
|
||||
"pg_internal.init",
|
||||
"pg.log",
|
||||
"zenith.signal",
|
||||
"neon_compute_spec_id.txt",
|
||||
"pg_hba.conf",
|
||||
"postgresql.conf",
|
||||
"postmaster.opts",
|
||||
|
||||
@@ -1,10 +1,63 @@
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
import requests
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
# Just start and measure duration.
|
||||
#
|
||||
# This test runs pretty quickly and can be informative when used in combination
|
||||
# with emulated network delay. Some useful delay commands:
|
||||
#
|
||||
# 1. Add 2msec delay to all localhost traffic
|
||||
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
|
||||
#
|
||||
# 2. Test that it works (you should see 4ms ping)
|
||||
# `ping localhost`
|
||||
#
|
||||
# 3. Revert back to normal
|
||||
# `sudo tc qdisc del dev lo root netem`
|
||||
#
|
||||
# NOTE this test might not represent the real startup time because the basebackup
|
||||
# for a large database might be larger if there's a lof of transaction metadata,
|
||||
# or safekeepers might need more syncing, or there might be more operations to
|
||||
# apply during config step, like more users, databases, or extensions. By default
|
||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||
# test we only load neon.
|
||||
def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{i}_start_and_select"):
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
|
||||
"basebackup_ms": f"{i}_basebackup",
|
||||
"config_ms": f"{i}_config",
|
||||
"total_startup_ms": f"{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
# This test sometimes runs for longer than the global 5 minute timeout.
|
||||
@pytest.mark.timeout(600)
|
||||
def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
|
||||
Reference in New Issue
Block a user