mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
pull_timeline and sk logging fixes (#11786)
This patch contains some fixes of issues I ran into for #11712: * make `pull_timeline` return success for timeline that already exists. This follows general API design of storage components: API endpoints are retryable and converge to a status code, instead of starting to error. We change the `pull_timeline`'s return type a little bit, because we might not actually have a source sk to pull from. Note that the fix is not enough, there is still a race when two `pull_timeline` instances happen in parallel: we might try to enter both pulled timelines at the same time. That can be fixed later. * make `pull_timeline` support one safekeeper being down. In general, if one safekeeper is down, that's not a problem. the added comment explains a potential situation (found in the `test_lagging_sk` test for example) * don't log very long errors when computes try to connect to safekeepers that don't have the timeline yet, if `allow_timeline_creation` is false. That flag is enabled when a sk connection string with generation numbers is passed to the compute, so we'll hit this code path more often. E.g. when a safekeeper missed a timeline creation, but the compute connects to it first before the `pull_timeline` gets requested by the storcon reconciler: this is a perfectly normal situation. So don't log the whole error backtrace, and don't log it on the error log level, but only on info. part of #11670
This commit is contained in:
@@ -841,6 +841,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
|
||||
let expected_end = match &end {
|
||||
ServerInitiated(_) | CopyDone | CopyFail | Terminate | EOF | Cancelled => true,
|
||||
// The timeline doesn't exist and we have been requested to not auto-create it.
|
||||
// Compute requests for timelines that haven't been created yet
|
||||
// might reach us before the storcon request to create those timelines.
|
||||
TimelineNoCreate => true,
|
||||
CopyStreamHandlerEnd::Disconnected(ConnectionError::Io(io_error))
|
||||
if is_expected_io_error(io_error) =>
|
||||
{
|
||||
@@ -1059,6 +1063,8 @@ pub enum CopyStreamHandlerEnd {
|
||||
Terminate,
|
||||
#[error("EOF on COPY stream")]
|
||||
EOF,
|
||||
#[error("timeline not found, and allow_timeline_creation is false")]
|
||||
TimelineNoCreate,
|
||||
/// The connection was lost
|
||||
#[error("connection error: {0}")]
|
||||
Disconnected(#[from] ConnectionError),
|
||||
|
||||
@@ -303,7 +303,8 @@ pub struct PullTimelineRequest {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PullTimelineResponse {
|
||||
// Donor safekeeper host
|
||||
pub safekeeper_host: String,
|
||||
/// Donor safekeeper host.
|
||||
/// None if no pull happened because the timeline already exists.
|
||||
pub safekeeper_host: Option<String>,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
@@ -401,7 +401,10 @@ pub async fn handle_request(
|
||||
request.timeline_id,
|
||||
));
|
||||
if existing_tli.is_ok() {
|
||||
bail!("Timeline {} already exists", request.timeline_id);
|
||||
info!("Timeline {} already exists", request.timeline_id);
|
||||
return Ok(PullTimelineResponse {
|
||||
safekeeper_host: None,
|
||||
});
|
||||
}
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
@@ -425,8 +428,25 @@ pub async fn handle_request(
|
||||
|
||||
let mut statuses = Vec::new();
|
||||
for (i, response) in responses.into_iter().enumerate() {
|
||||
let status = response.context(format!("fetching status from {}", http_hosts[i]))?;
|
||||
statuses.push((status, i));
|
||||
match response {
|
||||
Ok(status) => {
|
||||
statuses.push((status, i));
|
||||
}
|
||||
Err(e) => {
|
||||
info!("error fetching status from {}: {e}", http_hosts[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Allow missing responses from up to one safekeeper (say due to downtime)
|
||||
// e.g. if we created a timeline on PS A and B, with C being offline. Then B goes
|
||||
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
|
||||
let min_required_successful = (http_hosts.len() - 1).max(1);
|
||||
if statuses.len() < min_required_successful {
|
||||
bail!(
|
||||
"only got {} successful status responses. required: {min_required_successful}",
|
||||
statuses.len()
|
||||
)
|
||||
}
|
||||
|
||||
// Find the most advanced safekeeper
|
||||
@@ -536,6 +556,6 @@ async fn pull_timeline(
|
||||
.await?;
|
||||
|
||||
Ok(PullTimelineResponse {
|
||||
safekeeper_host: host,
|
||||
safekeeper_host: Some(host),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::metrics::{
|
||||
WAL_RECEIVERS,
|
||||
};
|
||||
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::timeline::{TimelineError, WalResidentTimeline};
|
||||
|
||||
const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
|
||||
|
||||
@@ -357,9 +357,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
|
||||
.await
|
||||
.context("create timeline")?
|
||||
} else {
|
||||
self.global_timelines
|
||||
.get(self.ttid)
|
||||
.context("get timeline")?
|
||||
let timeline_res = self.global_timelines.get(self.ttid);
|
||||
match timeline_res {
|
||||
Ok(tl) => tl,
|
||||
Err(TimelineError::NotFound(_)) => {
|
||||
return Err(CopyStreamHandlerEnd::TimelineNoCreate);
|
||||
}
|
||||
other => other.context("get_timeline")?,
|
||||
}
|
||||
};
|
||||
tli.wal_residence_guard().await?
|
||||
}
|
||||
|
||||
@@ -306,10 +306,11 @@ impl SafekeeperReconcilerInner {
|
||||
req,
|
||||
async |client| client.pull_timeline(&pull_req).await,
|
||||
|resp| {
|
||||
tracing::info!(
|
||||
"pulled timeline from {} onto {req_host}",
|
||||
resp.safekeeper_host,
|
||||
);
|
||||
if let Some(host) = resp.safekeeper_host {
|
||||
tracing::info!("pulled timeline from {host} onto {req_host}");
|
||||
} else {
|
||||
tracing::info!("timeline already present on safekeeper on {req_host}");
|
||||
}
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user