pageserver: support import schema evolution (#11935)

## Problem

Imports don't support schema evolution nicely. If we want to change the
stuff we keep in storcon,
we'd have to carry the old cruft around.

## Summary of changes

Version import progress. Note that the import progress version
determines the version of the import
job split and execution. This means that we can also use it as a
mechanism for deploying new import
implementations in the future.
This commit is contained in:
Vlad Lazar
2025-05-15 17:13:15 +01:00
committed by GitHub
parent 2621ce2daf
commit 31026d5a3c
5 changed files with 50 additions and 42 deletions

View File

@@ -342,7 +342,12 @@ pub enum ShardImportStatus {
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ShardImportProgress {
pub enum ShardImportProgress {
V1(ShardImportProgressV1),
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ShardImportProgressV1 {
/// Total number of jobs in the import plan
pub jobs: usize,
/// Number of jobs completed

View File

@@ -59,7 +59,7 @@ pub trait StorageControllerUpcallApi {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
) -> impl Future<Output = Result<Option<ShardImportStatus>, RetryForeverError>> + Send;
) -> impl Future<Output = Result<ShardImportStatus, RetryForeverError>> + Send;
}
impl StorageControllerUpcallClient {
@@ -104,6 +104,7 @@ impl StorageControllerUpcallClient {
&self,
url: &url::Url,
request: R,
method: reqwest::Method,
) -> Result<T, RetryForeverError>
where
R: Serialize,
@@ -113,7 +114,7 @@ impl StorageControllerUpcallClient {
|| async {
let response = self
.http_client
.post(url.clone())
.request(method.clone(), url.clone())
.json(&request)
.send()
.await?;
@@ -222,7 +223,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
register: register.clone(),
};
let response: ReAttachResponse = self.retry_http_forever(&url, request).await?;
let response: ReAttachResponse = self
.retry_http_forever(&url, request, reqwest::Method::POST)
.await?;
tracing::info!(
"Received re-attach response with {} tenants (node {}, register: {:?})",
response.tenants.len(),
@@ -275,7 +278,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
return Err(RetryForeverError::ShuttingDown);
}
let response: ValidateResponse = self.retry_http_forever(&url, request).await?;
let response: ValidateResponse = self
.retry_http_forever(&url, request, reqwest::Method::POST)
.await?;
for rt in response.tenants {
result.insert(rt.id, rt.valid);
}
@@ -309,7 +314,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
status,
};
self.retry_http_forever(&url, request).await
self.retry_http_forever(&url, request, reqwest::Method::POST)
.await
}
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
@@ -318,7 +324,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
) -> Result<Option<ShardImportStatus>, RetryForeverError> {
) -> Result<ShardImportStatus, RetryForeverError> {
let url = self
.base_url
.join("timeline_import_status")
@@ -330,32 +336,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
generation,
};
Ok(backoff::retry(
|| async {
let response = self
.http_client
.get(url.clone())
.json(&request)
.send()
.await?;
if let Err(err) = response.error_for_status_ref() {
if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) {
return Ok(None);
} else {
return Err(err);
}
}
response.json::<ShardImportStatus>().await.map(Some)
},
|_| false,
3,
u32::MAX,
"storage controller upcall",
&self.cancel,
)
.await
.ok_or(RetryForeverError::ShuttingDown)?
.expect("We retry forever, this should never be reached"))
let response: ShardImportStatus = self
.retry_http_forever(&url, request, reqwest::Method::GET)
.await?;
Ok(response)
}
}

View File

@@ -804,7 +804,7 @@ mod test {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
_generation: Generation,
) -> Result<Option<ShardImportStatus>, RetryForeverError> {
) -> Result<ShardImportStatus, RetryForeverError> {
unimplemented!()
}
}

View File

@@ -58,7 +58,7 @@ pub async fn doit(
.map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
info!(?shard_status, "peeking shard status");
match shard_status.unwrap_or(ShardImportStatus::InProgress(None)) {
match shard_status {
ShardImportStatus::InProgress(maybe_progress) => {
let storage =
importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;

View File

@@ -44,7 +44,7 @@ use pageserver_api::key::{
slru_segment_size_to_key,
};
use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range};
use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::relfile_utils::parse_relfilename;
@@ -74,6 +74,24 @@ pub async fn run(
storage: RemoteStorageWrapper,
import_progress: Option<ShardImportProgress>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Match how we run the import based on the progress version.
// If there's no import progress, it means that this is a new import
// and we can use whichever version we want.
match import_progress {
Some(ShardImportProgress::V1(progress)) => {
run_v1(timeline, control_file, storage, Some(progress), ctx).await
}
None => run_v1(timeline, control_file, storage, None, ctx).await,
}
}
async fn run_v1(
timeline: Arc<Timeline>,
control_file: ControlFile,
storage: RemoteStorageWrapper,
import_progress: Option<ShardImportProgressV1>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let planner = Planner {
control_file,
@@ -416,15 +434,17 @@ impl Plan {
last_completed_job_idx = job_idx;
if last_completed_job_idx % checkpoint_every == 0 {
let progress = ShardImportProgressV1 {
jobs: jobs_in_plan,
completed: last_completed_job_idx,
import_plan_hash,
};
storcon_client.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
timeline.generation,
ShardImportStatus::InProgress(Some(ShardImportProgress {
jobs: jobs_in_plan,
completed: last_completed_job_idx,
import_plan_hash,
}))
ShardImportStatus::InProgress(Some(ShardImportProgress::V1(progress)))
)
.await
.map_err(|_err| {