mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 04:42:56 +00:00
Compare commits
3 Commits
readonly-n
...
instrument
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0cb37311c2 | ||
|
|
1b687409be | ||
|
|
694bc1a481 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4666,6 +4666,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"pin-project-lite",
|
||||
"reqwest",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
||||
@@ -16,3 +16,4 @@ tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
pin-project-lite = "0.2"
|
||||
|
||||
72
libs/tracing-utils/src/instrument.rs
Normal file
72
libs/tracing-utils/src/instrument.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use tracing::{instrument::Instrumented, Level};
|
||||
|
||||
pub trait InstrumentCancel: Sized {
|
||||
type Inner;
|
||||
fn with_cancel_info(self) -> CancelLog<Self::Inner> {
|
||||
self.with_cancel_log(Level::INFO)
|
||||
}
|
||||
fn with_cancel_log(self, level: Level) -> CancelLog<Self::Inner>;
|
||||
}
|
||||
|
||||
impl<T: Sized> InstrumentCancel for Instrumented<T> {
|
||||
type Inner = T;
|
||||
fn with_cancel_log(self, level: Level) -> CancelLog<T> {
|
||||
CancelLog {
|
||||
inner: self,
|
||||
cancel_level: Some(level),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// A [`Future`] that has been instrumented with a `tracing` [`Span`]. It will emit a log on cancellation
|
||||
///
|
||||
/// This type is returned by the [`Instrument`] extension trait. See that
|
||||
/// trait's documentation for details.
|
||||
///
|
||||
/// [`Future`]: std::future::Future
|
||||
/// [`Span`]: crate::Span
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct CancelLog<T> {
|
||||
#[pin]
|
||||
inner: Instrumented<T>,
|
||||
cancel_level: Option<Level>,
|
||||
}
|
||||
|
||||
impl<T> PinnedDrop for CancelLog<T> {
|
||||
fn drop(this: Pin<&mut Self>) {
|
||||
let this = this.project();
|
||||
if let Some(level) = this.cancel_level.take() {
|
||||
let _enter = this.inner.span().enter();
|
||||
match level {
|
||||
Level::TRACE => tracing::event!(Level::TRACE, "task was cancelled"),
|
||||
Level::DEBUG => tracing::event!(Level::DEBUG, "task was cancelled"),
|
||||
Level::INFO => tracing::event!(Level::INFO, "task was cancelled"),
|
||||
Level::WARN => tracing::event!(Level::WARN, "task was cancelled"),
|
||||
Level::ERROR => tracing::event!(Level::ERROR, "task was cancelled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> Future for CancelLog<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let res = this.inner.poll(cx);
|
||||
if res.is_ready() {
|
||||
*this.cancel_level = None;
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,7 @@ use opentelemetry_otlp::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_
|
||||
pub use tracing_opentelemetry::OpenTelemetryLayer;
|
||||
|
||||
pub mod http;
|
||||
pub mod instrument;
|
||||
|
||||
/// Set up OpenTelemetry exporter, using configuration from environment variables.
|
||||
///
|
||||
|
||||
@@ -9,6 +9,7 @@ use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
|
||||
use std::future;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
use tracing_utils::instrument::InstrumentCancel;
|
||||
|
||||
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
|
||||
|
||||
@@ -50,23 +51,14 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<()> {
|
||||
async move {
|
||||
info!("serving a new console management API connection");
|
||||
|
||||
// these might be long running connections, have a separate logging for cancelling
|
||||
// on shutdown and other ways of stopping.
|
||||
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
|
||||
let _e = span.entered();
|
||||
info!("console management API task cancelled");
|
||||
});
|
||||
|
||||
if let Err(e) = handle_connection(socket).await {
|
||||
error!("serving failed with an error: {e}");
|
||||
} else {
|
||||
info!("serving completed");
|
||||
}
|
||||
|
||||
// we can no longer get dropped
|
||||
scopeguard::ScopeGuard::into_inner(cancelled);
|
||||
}
|
||||
.instrument(span),
|
||||
.instrument(span)
|
||||
.with_cancel_info(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use parking_lot::Mutex;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::fmt;
|
||||
use std::ops::ControlFlow;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::time;
|
||||
|
||||
@@ -9,8 +10,7 @@ use crate::{auth, console};
|
||||
|
||||
use super::sql_over_http::MAX_RESPONSE_SIZE;
|
||||
|
||||
use crate::proxy::try_wake;
|
||||
use crate::proxy::{BASE_RETRY_WAIT_DURATION, NUM_RETRIES_WAKE_COMPUTE};
|
||||
use crate::proxy::{invalidate_cache, retry_after, try_wake, NUM_RETRIES_WAKE_COMPUTE};
|
||||
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
@@ -184,11 +184,10 @@ impl GlobalConnPool {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Wake up the destination if needed. Code here is a bit involved because
|
||||
// we reuse the code from the usual proxy and we need to prepare few structures
|
||||
// that this code expects.
|
||||
//
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn connect_to_compute(
|
||||
config: &config::ProxyConfig,
|
||||
conn_info: &ConnInfo,
|
||||
@@ -220,54 +219,72 @@ async fn connect_to_compute(
|
||||
|
||||
let node_info = &mut creds.wake_compute(&extra).await?.expect("msg");
|
||||
|
||||
// This code is a copy of `connect_to_compute` from `src/proxy.rs` with
|
||||
// the difference that it uses `tokio_postgres` for the connection.
|
||||
let mut num_retries = 0;
|
||||
let mut should_wake = true;
|
||||
let mut wait_duration = time::Duration::ZERO;
|
||||
let mut should_wake_with_error = None;
|
||||
loop {
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
|
||||
// try wake the compute node if we have determined it's sensible to do so
|
||||
if let Some(err) = should_wake_with_error.take() {
|
||||
match try_wake(node_info, &extra, &creds).await {
|
||||
// we can't wake up the compute node
|
||||
Ok(None) => return Err(err),
|
||||
// there was an error communicating with the control plane
|
||||
Err(e) => return Err(e.into()),
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(Some(ControlFlow::Continue(()))) => {
|
||||
wait_duration = retry_after(num_retries);
|
||||
should_wake_with_error = Some(err);
|
||||
|
||||
num_retries += 1;
|
||||
info!(num_retries, "retrying wake compute");
|
||||
continue;
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(Some(ControlFlow::Break(()))) => {}
|
||||
}
|
||||
}
|
||||
|
||||
match connect_to_compute_once(node_info, conn_info).await {
|
||||
Err(e) if num_retries == NUM_RETRIES_WAKE_COMPUTE => {
|
||||
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if should_wake {
|
||||
match try_wake(node_info, &extra, &creds).await {
|
||||
Ok(Some(x)) => should_wake = x,
|
||||
Ok(None) => return Err(e.into()),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
} else {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !can_retry_error(&e, num_retries) {
|
||||
return Err(e.into());
|
||||
}
|
||||
wait_duration = retry_after(num_retries);
|
||||
|
||||
// after the first connect failure,
|
||||
// we should invalidate the cache and wake up a new compute node
|
||||
if num_retries == 0 {
|
||||
invalidate_cache(node_info);
|
||||
should_wake_with_error = Some(e.into());
|
||||
}
|
||||
}
|
||||
other => return Ok(other?),
|
||||
}
|
||||
|
||||
num_retries += 1;
|
||||
info!(retries_left = num_retries, "retrying connect");
|
||||
info!(num_retries, "retrying connect");
|
||||
}
|
||||
}
|
||||
|
||||
fn retry_connect_in(err: &tokio_postgres::Error, num_retries: u32) -> Option<time::Duration> {
|
||||
fn can_retry_error(err: &tokio_postgres::Error, num_retries: u32) -> bool {
|
||||
use tokio_postgres::error::SqlState;
|
||||
match err.code() {
|
||||
// retry all errors at least once immediately
|
||||
_ if num_retries == 0 => Some(time::Duration::ZERO),
|
||||
// keep retrying connection errors every 100ms
|
||||
// retry all errors at least once
|
||||
_ if num_retries == 0 => true,
|
||||
// keep retrying connection errors
|
||||
Some(
|
||||
&SqlState::CONNECTION_FAILURE
|
||||
| &SqlState::CONNECTION_EXCEPTION
|
||||
| &SqlState::CONNECTION_DOES_NOT_EXIST
|
||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
) => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
|
||||
}
|
||||
) if num_retries < NUM_RETRIES_WAKE_COMPUTE => true,
|
||||
// otherwise, don't retry
|
||||
_ => None,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use hyper::StatusCode;
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::sync::Arc;
|
||||
use std::{ops::ControlFlow, sync::Arc};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
|
||||
time,
|
||||
@@ -32,7 +32,7 @@ use utils::measured_stream::MeasuredStream;
|
||||
/// Number of times we should retry the `/proxy_wake_compute` http request.
|
||||
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
|
||||
pub const NUM_RETRIES_WAKE_COMPUTE: u32 = 10;
|
||||
pub const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
|
||||
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
@@ -335,11 +335,37 @@ async fn connect_to_compute(
|
||||
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
let mut num_retries = 0;
|
||||
let mut should_wake = true;
|
||||
let mut wait_duration = time::Duration::ZERO;
|
||||
let mut should_wake_with_error = None;
|
||||
loop {
|
||||
// Apply startup params to the (possibly, cached) compute node info.
|
||||
node_info.config.set_startup_params(params);
|
||||
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
|
||||
// try wake the compute node if we have determined it's sensible to do so
|
||||
if let Some(err) = should_wake_with_error.take() {
|
||||
match try_wake(node_info, extra, creds).await {
|
||||
// we can't wake up the compute node
|
||||
Ok(None) => return Err(err),
|
||||
// there was an error communicating with the control plane
|
||||
Err(e) => return Err(io_error(e).into()),
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(Some(ControlFlow::Continue(()))) => {
|
||||
wait_duration = retry_after(num_retries);
|
||||
should_wake_with_error = Some(err);
|
||||
|
||||
num_retries += 1;
|
||||
info!(num_retries, "retrying wake compute");
|
||||
continue;
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(Some(ControlFlow::Break(()))) => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Set a shorter timeout for the initial connection attempt.
|
||||
//
|
||||
// In case we try to connect to an outdated address that is no longer valid, the
|
||||
@@ -360,30 +386,25 @@ async fn connect_to_compute(
|
||||
};
|
||||
|
||||
match connect_to_compute_once(node_info, timeout).await {
|
||||
Err(e) if num_retries < NUM_RETRIES_WAKE_COMPUTE => {
|
||||
if let Some(wait_duration) = retry_connect_in(&e, num_retries) {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if should_wake {
|
||||
match try_wake(node_info, extra, creds).await {
|
||||
Ok(Some(x)) => {
|
||||
should_wake = x;
|
||||
}
|
||||
Ok(None) => return Err(e),
|
||||
Err(e) => return Err(io_error(e).into()),
|
||||
}
|
||||
}
|
||||
if !wait_duration.is_zero() {
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
} else {
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if !can_retry_error(&e, num_retries) {
|
||||
return Err(e);
|
||||
}
|
||||
wait_duration = retry_after(num_retries);
|
||||
|
||||
// after the first connect failure,
|
||||
// we should invalidate the cache and wake up a new compute node
|
||||
if num_retries == 0 {
|
||||
invalidate_cache(node_info);
|
||||
should_wake_with_error = Some(e);
|
||||
}
|
||||
}
|
||||
other => return other,
|
||||
}
|
||||
|
||||
num_retries += 1;
|
||||
info!(retries_left = num_retries, "retrying connect");
|
||||
info!(num_retries, "retrying connect");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,41 +416,51 @@ pub async fn try_wake(
|
||||
node_info: &mut console::CachedNodeInfo,
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
|
||||
) -> Result<Option<bool>, WakeComputeError> {
|
||||
) -> Result<Option<ControlFlow<()>>, WakeComputeError> {
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
invalidate_cache(node_info);
|
||||
match creds.wake_compute(extra).await {
|
||||
// retry wake if the compute was in an invalid state
|
||||
Err(WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
})) => Ok(Some(true)),
|
||||
})) => Ok(Some(ControlFlow::Continue(()))),
|
||||
// Update `node_info` and try again.
|
||||
Ok(Some(mut new)) => {
|
||||
new.config.reuse_password(&node_info.config);
|
||||
*node_info = new;
|
||||
Ok(Some(false))
|
||||
Ok(Some(ControlFlow::Break(())))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
Ok(None) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn retry_connect_in(err: &compute::ConnectionError, num_retries: u32) -> Option<time::Duration> {
|
||||
fn can_retry_error(err: &compute::ConnectionError, num_retries: u32) -> bool {
|
||||
use std::io::ErrorKind;
|
||||
match err {
|
||||
// retry all errors at least once immediately
|
||||
_ if num_retries == 0 => Some(time::Duration::ZERO),
|
||||
// keep retrying connection errors every 100ms
|
||||
compute::ConnectionError::CouldNotConnect(io_err) => match io_err.kind() {
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
Some(BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries))
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
// retry all errors at least once
|
||||
_ if num_retries == 0 => true,
|
||||
// keep retrying connection errors
|
||||
compute::ConnectionError::CouldNotConnect(io_err)
|
||||
if num_retries < NUM_RETRIES_WAKE_COMPUTE =>
|
||||
{
|
||||
matches!(
|
||||
io_err.kind(),
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable
|
||||
)
|
||||
}
|
||||
// otherwise, don't retry
|
||||
_ => None,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
match num_retries {
|
||||
0 => time::Duration::ZERO,
|
||||
_ => {
|
||||
// 3/2 = 1.5 which seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION * 3_u32.pow(num_retries) / 2_u32.pow(num_retries)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
//! A group of high-level tests for connection establishing logic and auth.
|
||||
use std::io;
|
||||
|
||||
use super::*;
|
||||
use crate::{auth, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
@@ -299,14 +297,9 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
|
||||
#[test]
|
||||
fn connect_compute_total_wait() {
|
||||
let err = compute::ConnectionError::CouldNotConnect(io::Error::new(
|
||||
io::ErrorKind::ConnectionRefused,
|
||||
"conn refused",
|
||||
));
|
||||
|
||||
let mut total_wait = tokio::time::Duration::ZERO;
|
||||
for num_retries in 0..10 {
|
||||
total_wait += retry_connect_in(&err, num_retries).unwrap();
|
||||
total_wait += retry_after(num_retries);
|
||||
}
|
||||
assert!(total_wait < tokio::time::Duration::from_secs(12));
|
||||
assert!(total_wait > tokio::time::Duration::from_secs(10));
|
||||
|
||||
Reference in New Issue
Block a user