mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
## Problem Timeline imports do not have progress checkpointing. Any time that the tenant is shut-down, all progress is lost and the import restarts from the beginning when the tenant is re-attached. ## Summary of changes This PR adds progress checkpointing. ### Preliminaries The **unit of work** is a `ChunkProcessingJob`. Each `ChunkProcessingJob` deals with the import for a set of key ranges. The job split is done by using an estimation of how many pages each job will produce. The planning stage must be **pure**: given a fixed set of contents in the import bucket, it will always yield the same plan. This property is enforced by checking that the hash of the plan is identical when resuming from a checkpoint. The storage controller tracks the progress of each shard in the import in the database in the form of the **latest job** that has has completed. ### Flow This is the high level flow for the happy path: 1. On the first run of the import task, the import task queries storcon for the progress and sees that none is recorded. 2. Execute the preparatory stage of the import 3. Import jobs start running concurrently in a `FuturesOrdered`. Every time the checkpointing threshold of jobs has been reached, notify the storage controller. 4. Tenant is detached and re-attached 5. Import task starts up again and gets the latest progress checkpoint from the storage controller in the form of a job index. 6. The plan is computed again and we check that the hash matches with the original plan. 7. Jobs are spawned from where the previous import task left off. Note that we will not report progress after the completion of each job, so some jobs might run twice. Closes https://github.com/neondatabase/neon/issues/11568 Closes https://github.com/neondatabase/neon/issues/11664
282 lines
8.4 KiB
Rust
282 lines
8.4 KiB
Rust
use std::time::Duration;
|
|
use std::{collections::HashMap, str::FromStr};
|
|
|
|
use http_utils::error::ApiError;
|
|
use reqwest::Method;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
|
|
use tokio_util::sync::CancellationToken;
|
|
use utils::{
|
|
id::{TenantId, TimelineId},
|
|
shard::ShardIndex,
|
|
};
|
|
|
|
use crate::{persistence::TimelineImportPersistence, service::Config};
|
|
|
|
#[derive(Deserialize, Serialize, PartialEq, Eq)]
|
|
pub(crate) enum TimelineImportState {
|
|
Importing,
|
|
Idle,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
|
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
|
|
|
|
impl ShardImportStatuses {
|
|
pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
|
|
ShardImportStatuses(
|
|
shards
|
|
.into_iter()
|
|
.map(|ts_id| {
|
|
(
|
|
ts_id,
|
|
ShardImportStatus::InProgress(None::<ShardImportProgress>),
|
|
)
|
|
})
|
|
.collect(),
|
|
)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) struct TimelineImport {
|
|
pub(crate) tenant_id: TenantId,
|
|
pub(crate) timeline_id: TimelineId,
|
|
pub(crate) shard_statuses: ShardImportStatuses,
|
|
}
|
|
|
|
pub(crate) enum TimelineImportUpdateFollowUp {
|
|
Persist,
|
|
None,
|
|
}
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub(crate) enum TimelineImportFinalizeError {
|
|
#[error("Shut down interrupted import finalize")]
|
|
ShuttingDown,
|
|
#[error("Mismatched shard detected during import finalize: {0}")]
|
|
MismatchedShards(ShardIndex),
|
|
}
|
|
|
|
pub(crate) enum TimelineImportUpdateError {
|
|
ImportNotFound {
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
},
|
|
MismatchedShards,
|
|
UnexpectedUpdate,
|
|
}
|
|
|
|
impl From<TimelineImportUpdateError> for ApiError {
|
|
fn from(err: TimelineImportUpdateError) -> ApiError {
|
|
match err {
|
|
TimelineImportUpdateError::ImportNotFound {
|
|
tenant_id,
|
|
timeline_id,
|
|
} => ApiError::NotFound(
|
|
anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
|
|
),
|
|
TimelineImportUpdateError::MismatchedShards => {
|
|
ApiError::InternalServerError(anyhow::anyhow!(
|
|
"Import shards do not match update request, likely a shard split happened during import, this is a bug"
|
|
))
|
|
}
|
|
TimelineImportUpdateError::UnexpectedUpdate => {
|
|
ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TimelineImport {
|
|
pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
|
|
let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
|
|
let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
|
|
let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
|
|
|
|
Ok(TimelineImport {
|
|
tenant_id,
|
|
timeline_id,
|
|
shard_statuses,
|
|
})
|
|
}
|
|
|
|
pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
|
|
TimelineImportPersistence {
|
|
tenant_id: self.tenant_id.to_string(),
|
|
timeline_id: self.timeline_id.to_string(),
|
|
shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn update(
|
|
&mut self,
|
|
shard: ShardIndex,
|
|
status: ShardImportStatus,
|
|
) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
|
|
use std::collections::hash_map::Entry::*;
|
|
|
|
match self.shard_statuses.0.entry(shard) {
|
|
Occupied(mut occ) => {
|
|
let crnt = occ.get_mut();
|
|
if *crnt == status {
|
|
Ok(TimelineImportUpdateFollowUp::None)
|
|
} else if crnt.is_terminal() && *crnt != status {
|
|
Err(TimelineImportUpdateError::UnexpectedUpdate)
|
|
} else {
|
|
*crnt = status;
|
|
Ok(TimelineImportUpdateFollowUp::Persist)
|
|
}
|
|
}
|
|
Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn is_complete(&self) -> bool {
|
|
self.shard_statuses
|
|
.0
|
|
.values()
|
|
.all(|status| status.is_terminal())
|
|
}
|
|
|
|
pub(crate) fn completion_error(&self) -> Option<String> {
|
|
assert!(self.is_complete());
|
|
|
|
let shard_errors: HashMap<_, _> = self
|
|
.shard_statuses
|
|
.0
|
|
.iter()
|
|
.filter_map(|(shard, status)| {
|
|
if let ShardImportStatus::Error(err) = status {
|
|
Some((*shard, err.clone()))
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
if shard_errors.is_empty() {
|
|
None
|
|
} else {
|
|
Some(serde_json::to_string(&shard_errors).unwrap())
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) type ImportResult = Result<(), String>;
|
|
|
|
pub(crate) struct UpcallClient {
|
|
authorization_header: Option<String>,
|
|
client: reqwest::Client,
|
|
cancel: CancellationToken,
|
|
base_url: String,
|
|
}
|
|
|
|
const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
struct ImportCompleteRequest {
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
error: Option<String>,
|
|
}
|
|
|
|
impl UpcallClient {
|
|
pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
|
|
let authorization_header = config
|
|
.control_plane_jwt_token
|
|
.clone()
|
|
.map(|jwt| format!("Bearer {}", jwt));
|
|
|
|
let client = reqwest::ClientBuilder::new()
|
|
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
|
|
.build()
|
|
.expect("Failed to construct HTTP client");
|
|
|
|
let base_url = config
|
|
.control_plane_url
|
|
.clone()
|
|
.expect("must be configured");
|
|
|
|
Self {
|
|
authorization_header,
|
|
client,
|
|
cancel,
|
|
base_url,
|
|
}
|
|
}
|
|
|
|
/// Notify control plane of a completed import
|
|
///
|
|
/// This method guarantees at least once delivery semantics assuming
|
|
/// eventual cplane availability. The cplane API is idempotent.
|
|
pub(crate) async fn notify_import_complete(
|
|
&self,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
import_result: ImportResult,
|
|
) -> anyhow::Result<()> {
|
|
let endpoint = if self.base_url.ends_with('/') {
|
|
format!("{}import_complete", self.base_url)
|
|
} else {
|
|
format!("{}/import_complete", self.base_url)
|
|
};
|
|
|
|
let request = self
|
|
.client
|
|
.request(Method::PUT, endpoint)
|
|
.json(&ImportCompleteRequest {
|
|
tenant_id,
|
|
timeline_id,
|
|
error: import_result.err(),
|
|
})
|
|
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
|
|
|
|
let request = if let Some(auth) = &self.authorization_header {
|
|
request.header(reqwest::header::AUTHORIZATION, auth)
|
|
} else {
|
|
request
|
|
};
|
|
|
|
const RETRY_DELAY: Duration = Duration::from_secs(1);
|
|
let mut attempt = 1;
|
|
|
|
loop {
|
|
if self.cancel.is_cancelled() {
|
|
return Err(anyhow::anyhow!(
|
|
"Shutting down while notifying cplane of import completion"
|
|
));
|
|
}
|
|
|
|
match request.try_clone().unwrap().send().await {
|
|
Ok(response) if response.status().is_success() => {
|
|
return Ok(());
|
|
}
|
|
Ok(response) => {
|
|
tracing::warn!(
|
|
"Import complete notification failed with status {}, attempt {}",
|
|
response.status(),
|
|
attempt
|
|
);
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(
|
|
"Import complete notification failed with error: {}, attempt {}",
|
|
e,
|
|
attempt
|
|
);
|
|
}
|
|
}
|
|
|
|
tokio::select! {
|
|
_ = tokio::time::sleep(RETRY_DELAY) => {}
|
|
_ = self.cancel.cancelled() => {
|
|
return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
|
|
}
|
|
}
|
|
attempt += 1;
|
|
}
|
|
}
|
|
}
|