From 31026d5a3c246956dda9ba4925efdc72ded42de0 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 15 May 2025 17:13:15 +0100 Subject: [PATCH] pageserver: support import schema evolution (#11935) ## Problem Imports don't support schema evolution nicely. If we want to change the stuff we keep in storcon, we'd have to carry the old cruft around. ## Summary of changes Version import progress. Note that the import progress version determines the version of the import job split and execution. This means that we can also use it as a mechanism for deploying new import implementations in the future. --- libs/pageserver_api/src/models.rs | 7 ++- pageserver/src/controller_upcall_client.rs | 49 ++++++------------- pageserver/src/deletion_queue.rs | 2 +- .../src/tenant/timeline/import_pgdata.rs | 2 +- .../src/tenant/timeline/import_pgdata/flow.rs | 32 +++++++++--- 5 files changed, 50 insertions(+), 42 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 58b8d80c0a..e9b37c8ca6 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -342,7 +342,12 @@ pub enum ShardImportStatus { } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct ShardImportProgress { +pub enum ShardImportProgress { + V1(ShardImportProgressV1), +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct ShardImportProgressV1 { /// Total number of jobs in the import plan pub jobs: usize, /// Number of jobs completed diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 779ef3e37d..dc38ea616c 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -59,7 +59,7 @@ pub trait StorageControllerUpcallApi { tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, - ) -> impl Future, RetryForeverError>> + Send; + ) -> impl Future> + Send; } impl StorageControllerUpcallClient { @@ -104,6 +104,7 @@ impl StorageControllerUpcallClient { &self, url: &url::Url, request: R, + method: reqwest::Method, ) -> Result where R: Serialize, @@ -113,7 +114,7 @@ impl StorageControllerUpcallClient { || async { let response = self .http_client - .post(url.clone()) + .request(method.clone(), url.clone()) .json(&request) .send() .await?; @@ -222,7 +223,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { register: register.clone(), }; - let response: ReAttachResponse = self.retry_http_forever(&url, request).await?; + let response: ReAttachResponse = self + .retry_http_forever(&url, request, reqwest::Method::POST) + .await?; tracing::info!( "Received re-attach response with {} tenants (node {}, register: {:?})", response.tenants.len(), @@ -275,7 +278,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { return Err(RetryForeverError::ShuttingDown); } - let response: ValidateResponse = self.retry_http_forever(&url, request).await?; + let response: ValidateResponse = self + .retry_http_forever(&url, request, reqwest::Method::POST) + .await?; for rt in response.tenants { result.insert(rt.id, rt.valid); } @@ -309,7 +314,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { status, }; - self.retry_http_forever(&url, request).await + self.retry_http_forever(&url, request, reqwest::Method::POST) + .await } #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context @@ -318,7 +324,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, - ) -> Result, RetryForeverError> { + ) -> Result { let url = self .base_url .join("timeline_import_status") @@ -330,32 +336,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { generation, }; - Ok(backoff::retry( - || async { - let response = self - .http_client - .get(url.clone()) - .json(&request) - .send() - .await?; - - if let Err(err) = response.error_for_status_ref() { - if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) { - return Ok(None); - } else { - return Err(err); - } - } - response.json::().await.map(Some) - }, - |_| false, - 3, - u32::MAX, - "storage controller upcall", - &self.cancel, - ) - .await - .ok_or(RetryForeverError::ShuttingDown)? - .expect("We retry forever, this should never be reached")) + let response: ShardImportStatus = self + .retry_http_forever(&url, request, reqwest::Method::GET) + .await?; + Ok(response) } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 0bbad87c09..7854fd9e36 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -804,7 +804,7 @@ mod test { _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, _generation: Generation, - ) -> Result, RetryForeverError> { + ) -> Result { unimplemented!() } } diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index 602b20df97..658d867c18 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -58,7 +58,7 @@ pub async fn doit( .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?; info!(?shard_status, "peeking shard status"); - match shard_status.unwrap_or(ShardImportStatus::InProgress(None)) { + match shard_status { ShardImportStatus::InProgress(maybe_progress) => { let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index c8c3bdcdfb..3e10a4e6d6 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -44,7 +44,7 @@ use pageserver_api::key::{ slru_segment_size_to_key, }; use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range}; -use pageserver_api::models::{ShardImportProgress, ShardImportStatus}; +use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; use postgres_ffi::relfile_utils::parse_relfilename; @@ -74,6 +74,24 @@ pub async fn run( storage: RemoteStorageWrapper, import_progress: Option, ctx: &RequestContext, +) -> anyhow::Result<()> { + // Match how we run the import based on the progress version. + // If there's no import progress, it means that this is a new import + // and we can use whichever version we want. + match import_progress { + Some(ShardImportProgress::V1(progress)) => { + run_v1(timeline, control_file, storage, Some(progress), ctx).await + } + None => run_v1(timeline, control_file, storage, None, ctx).await, + } +} + +async fn run_v1( + timeline: Arc, + control_file: ControlFile, + storage: RemoteStorageWrapper, + import_progress: Option, + ctx: &RequestContext, ) -> anyhow::Result<()> { let planner = Planner { control_file, @@ -416,15 +434,17 @@ impl Plan { last_completed_job_idx = job_idx; if last_completed_job_idx % checkpoint_every == 0 { + let progress = ShardImportProgressV1 { + jobs: jobs_in_plan, + completed: last_completed_job_idx, + import_plan_hash, + }; + storcon_client.put_timeline_import_status( timeline.tenant_shard_id, timeline.timeline_id, timeline.generation, - ShardImportStatus::InProgress(Some(ShardImportProgress { - jobs: jobs_in_plan, - completed: last_completed_job_idx, - import_plan_hash, - })) + ShardImportStatus::InProgress(Some(ShardImportProgress::V1(progress))) ) .await .map_err(|_err| {