mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
cold starts: Add sync-safekeepers fast path (#4804)
This commit is contained in:
@@ -8,9 +8,11 @@ use std::sync::{Condvar, Mutex};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio_postgres;
|
||||
use tracing::{info, instrument, warn};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -21,6 +23,7 @@ use utils::measured_stream::MeasuredReader;
|
||||
use crate::config;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
@@ -309,6 +312,101 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn check_safekeepers_synced_async(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
) -> Result<Option<Lsn>> {
|
||||
// Construct a connection config for each safekeeper
|
||||
let pspec: ParsedSpec = compute_state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.clone();
|
||||
let sk_connstrs: Vec<String> = pspec.spec.safekeeper_connstrings.clone();
|
||||
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
|
||||
// Format connstr
|
||||
let connstr = format!("postgresql://no_user@{}", connstr);
|
||||
let options = format!(
|
||||
"-c timeline_id={} tenant_id={}",
|
||||
pspec.timeline_id, pspec.tenant_id
|
||||
);
|
||||
|
||||
// Construct client
|
||||
let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
|
||||
config.options(&options);
|
||||
if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
|
||||
config.password(storage_auth_token);
|
||||
}
|
||||
|
||||
config
|
||||
});
|
||||
|
||||
// Create task set to query all safekeepers
|
||||
let mut tasks = FuturesUnordered::new();
|
||||
let quorum = sk_configs.len() / 2 + 1;
|
||||
for config in sk_configs {
|
||||
let timeout = tokio::time::Duration::from_millis(100);
|
||||
let task = tokio::time::timeout(timeout, ping_safekeeper(config));
|
||||
tasks.push(tokio::spawn(task));
|
||||
}
|
||||
|
||||
// Get a quorum of responses or errors
|
||||
let mut responses = Vec::new();
|
||||
let mut join_errors = Vec::new();
|
||||
let mut task_errors = Vec::new();
|
||||
let mut timeout_errors = Vec::new();
|
||||
while let Some(response) = tasks.next().await {
|
||||
match response {
|
||||
Ok(Ok(Ok(r))) => responses.push(r),
|
||||
Ok(Ok(Err(e))) => task_errors.push(e),
|
||||
Ok(Err(e)) => timeout_errors.push(e),
|
||||
Err(e) => join_errors.push(e),
|
||||
};
|
||||
if responses.len() >= quorum {
|
||||
break;
|
||||
}
|
||||
if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// In case of error, log and fail the check, but don't crash.
|
||||
// We're playing it safe because these errors could be transient
|
||||
// and we don't yet retry. Also being careful here allows us to
|
||||
// be backwards compatible with safekeepers that don't have the
|
||||
// TIMELINE_STATUS API yet.
|
||||
if responses.len() < quorum {
|
||||
error!(
|
||||
"failed sync safekeepers check {:?} {:?} {:?}",
|
||||
join_errors, task_errors, timeout_errors
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(check_if_synced(responses))
|
||||
}
|
||||
|
||||
// Fast path for sync_safekeepers. If they're already synced we get the lsn
|
||||
// in one roundtrip. If not, we should do a full sync_safekeepers.
|
||||
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
// Run actual work with new tokio runtime
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
|
||||
|
||||
// Record runtime
|
||||
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
|
||||
.signed_duration_since(start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
result
|
||||
}
|
||||
|
||||
// Run `postgres` in a special mode with `--sync-safekeepers` argument
|
||||
// and return the reported LSN back to the caller.
|
||||
#[instrument(skip_all)]
|
||||
@@ -371,10 +469,14 @@ impl ComputeNode {
|
||||
// cannot sync safekeepers.
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("starting safekeepers syncing");
|
||||
let lsn = self
|
||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?;
|
||||
info!("checking if safekeepers are synced");
|
||||
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
|
||||
lsn
|
||||
} else {
|
||||
info!("starting safekeepers syncing");
|
||||
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?
|
||||
};
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
|
||||
@@ -13,3 +13,4 @@ pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
pub mod spec;
|
||||
pub mod sync_sk;
|
||||
|
||||
95
compute_tools/src/sync_sk.rs
Normal file
95
compute_tools/src/sync_sk.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
// Utils for running sync_safekeepers
|
||||
use anyhow::Result;
|
||||
use tracing::info;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub enum TimelineStatusResponse {
|
||||
NotFound,
|
||||
Ok(TimelineStatusOkResponse),
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct TimelineStatusOkResponse {
|
||||
flush_lsn: Lsn,
|
||||
commit_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Get a safekeeper's metadata for our timeline
|
||||
pub async fn ping_safekeeper(config: tokio_postgres::Config) -> Result<TimelineStatusResponse> {
|
||||
// TODO add retries
|
||||
|
||||
// Connect
|
||||
info!("connecting to {:?}", config);
|
||||
let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
// Query
|
||||
info!("querying {:?}", config);
|
||||
let result = client.simple_query("TIMELINE_STATUS").await?;
|
||||
|
||||
// Parse result
|
||||
info!("done with {:?}", config);
|
||||
if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
|
||||
use std::str::FromStr;
|
||||
let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {
|
||||
flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?,
|
||||
commit_lsn: Lsn::from_str(row.get("commit_lsn").unwrap())?,
|
||||
});
|
||||
Ok(response)
|
||||
} else {
|
||||
// Timeline doesn't exist
|
||||
Ok(TimelineStatusResponse::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a quorum of responses, check if safekeepers are synced at some Lsn
|
||||
pub fn check_if_synced(responses: Vec<TimelineStatusResponse>) -> Option<Lsn> {
|
||||
// Check if all responses are ok
|
||||
let ok_responses: Vec<TimelineStatusOkResponse> = responses
|
||||
.iter()
|
||||
.filter_map(|r| match r {
|
||||
TimelineStatusResponse::Ok(ok_response) => Some(ok_response),
|
||||
_ => None,
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
if ok_responses.len() < responses.len() {
|
||||
info!(
|
||||
"not synced. Only {} out of {} know about this timeline",
|
||||
ok_responses.len(),
|
||||
responses.len()
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
// Get the min and the max of everything
|
||||
let commit: Vec<Lsn> = ok_responses.iter().map(|r| r.commit_lsn).collect();
|
||||
let flush: Vec<Lsn> = ok_responses.iter().map(|r| r.flush_lsn).collect();
|
||||
let commit_max = commit.iter().max().unwrap();
|
||||
let commit_min = commit.iter().min().unwrap();
|
||||
let flush_max = flush.iter().max().unwrap();
|
||||
let flush_min = flush.iter().min().unwrap();
|
||||
|
||||
// Check that all values are equal
|
||||
if commit_min != commit_max {
|
||||
info!("not synced. {:?} {:?}", commit_min, commit_max);
|
||||
return None;
|
||||
}
|
||||
if flush_min != flush_max {
|
||||
info!("not synced. {:?} {:?}", flush_min, flush_max);
|
||||
return None;
|
||||
}
|
||||
|
||||
// Check that commit == flush
|
||||
if commit_max != flush_max {
|
||||
info!("not synced. {:?} {:?}", commit_max, flush_max);
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(*commit_max)
|
||||
}
|
||||
@@ -70,6 +70,7 @@ where
|
||||
pub struct ComputeMetrics {
|
||||
pub wait_for_spec_ms: u64,
|
||||
pub sync_safekeepers_ms: u64,
|
||||
pub sync_sk_check_ms: u64,
|
||||
pub basebackup_ms: u64,
|
||||
pub basebackup_bytes: u64,
|
||||
pub start_postgres_ms: u64,
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
|
||||
use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
|
||||
use crate::timeline::TimelineError;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use postgres_backend::QueryError;
|
||||
@@ -45,6 +46,7 @@ enum SafekeeperPostgresCommand {
|
||||
StartWalPush,
|
||||
StartReplication { start_lsn: Lsn },
|
||||
IdentifySystem,
|
||||
TimelineStatus,
|
||||
JSONCtrl { cmd: AppendLogicalMessage },
|
||||
}
|
||||
|
||||
@@ -64,6 +66,8 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
|
||||
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
|
||||
Ok(SafekeeperPostgresCommand::IdentifySystem)
|
||||
} else if cmd.starts_with("TIMELINE_STATUS") {
|
||||
Ok(SafekeeperPostgresCommand::TimelineStatus)
|
||||
} else if cmd.starts_with("JSON_CTRL") {
|
||||
let cmd = cmd.strip_prefix("JSON_CTRL").context("invalid prefix")?;
|
||||
Ok(SafekeeperPostgresCommand::JSONCtrl {
|
||||
@@ -78,6 +82,7 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
|
||||
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
|
||||
SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
|
||||
SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
|
||||
SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL",
|
||||
}
|
||||
@@ -219,6 +224,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
.await
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
|
||||
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
|
||||
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
|
||||
handle_json_ctrl(self, pgb, cmd).await
|
||||
}
|
||||
@@ -263,6 +269,38 @@ impl SafekeeperPostgresHandler {
|
||||
check_permission(claims, tenant_id)
|
||||
}
|
||||
|
||||
async fn handle_timeline_status<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
) -> Result<(), QueryError> {
|
||||
// Get timeline, handling "not found" error
|
||||
let tli = match GlobalTimelines::get(self.ttid) {
|
||||
Ok(tli) => Ok(Some(tli)),
|
||||
Err(TimelineError::NotFound(_)) => Ok(None),
|
||||
Err(e) => Err(QueryError::Other(e.into())),
|
||||
}?;
|
||||
|
||||
// Write row description
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::text_col(b"flush_lsn"),
|
||||
RowDescriptor::text_col(b"commit_lsn"),
|
||||
]))?;
|
||||
|
||||
// Write row if timeline exists
|
||||
if let Some(tli) = tli {
|
||||
let (inmem, _state) = tli.get_state().await;
|
||||
let flush_lsn = tli.get_flush_lsn().await;
|
||||
let commit_lsn = inmem.commit_lsn;
|
||||
pgb.write_message_noflush(&BeMessage::DataRow(&[
|
||||
Some(flush_lsn.to_string().as_bytes()),
|
||||
Some(commit_lsn.to_string().as_bytes()),
|
||||
]))?;
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"TIMELINE_STATUS"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
|
||||
@@ -61,6 +61,7 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
|
||||
"sync_sk_check_ms": f"{i}_sync_sk_check",
|
||||
"basebackup_ms": f"{i}_basebackup",
|
||||
"start_postgres_ms": f"{i}_start_postgres",
|
||||
"config_ms": f"{i}_config",
|
||||
|
||||
Reference in New Issue
Block a user