diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 58b8d80c0a..e9b37c8ca6 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -342,7 +342,12 @@ pub enum ShardImportStatus { } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct ShardImportProgress { +pub enum ShardImportProgress { + V1(ShardImportProgressV1), +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct ShardImportProgressV1 { /// Total number of jobs in the import plan pub jobs: usize, /// Number of jobs completed diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 779ef3e37d..dc38ea616c 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -59,7 +59,7 @@ pub trait StorageControllerUpcallApi { tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, - ) -> impl Future, RetryForeverError>> + Send; + ) -> impl Future> + Send; } impl StorageControllerUpcallClient { @@ -104,6 +104,7 @@ impl StorageControllerUpcallClient { &self, url: &url::Url, request: R, + method: reqwest::Method, ) -> Result where R: Serialize, @@ -113,7 +114,7 @@ impl StorageControllerUpcallClient { || async { let response = self .http_client - .post(url.clone()) + .request(method.clone(), url.clone()) .json(&request) .send() .await?; @@ -222,7 +223,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { register: register.clone(), }; - let response: ReAttachResponse = self.retry_http_forever(&url, request).await?; + let response: ReAttachResponse = self + .retry_http_forever(&url, request, reqwest::Method::POST) + .await?; tracing::info!( "Received re-attach response with {} tenants (node {}, register: {:?})", response.tenants.len(), @@ -275,7 +278,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { return Err(RetryForeverError::ShuttingDown); } - let response: ValidateResponse = self.retry_http_forever(&url, request).await?; + let response: ValidateResponse = self + .retry_http_forever(&url, request, reqwest::Method::POST) + .await?; for rt in response.tenants { result.insert(rt.id, rt.valid); } @@ -309,7 +314,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { status, }; - self.retry_http_forever(&url, request).await + self.retry_http_forever(&url, request, reqwest::Method::POST) + .await } #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context @@ -318,7 +324,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, - ) -> Result, RetryForeverError> { + ) -> Result { let url = self .base_url .join("timeline_import_status") @@ -330,32 +336,9 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { generation, }; - Ok(backoff::retry( - || async { - let response = self - .http_client - .get(url.clone()) - .json(&request) - .send() - .await?; - - if let Err(err) = response.error_for_status_ref() { - if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) { - return Ok(None); - } else { - return Err(err); - } - } - response.json::().await.map(Some) - }, - |_| false, - 3, - u32::MAX, - "storage controller upcall", - &self.cancel, - ) - .await - .ok_or(RetryForeverError::ShuttingDown)? - .expect("We retry forever, this should never be reached")) + let response: ShardImportStatus = self + .retry_http_forever(&url, request, reqwest::Method::GET) + .await?; + Ok(response) } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 0bbad87c09..7854fd9e36 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -804,7 +804,7 @@ mod test { _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, _generation: Generation, - ) -> Result, RetryForeverError> { + ) -> Result { unimplemented!() } } diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index 602b20df97..658d867c18 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -58,7 +58,7 @@ pub async fn doit( .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?; info!(?shard_status, "peeking shard status"); - match shard_status.unwrap_or(ShardImportStatus::InProgress(None)) { + match shard_status { ShardImportStatus::InProgress(maybe_progress) => { let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index c8c3bdcdfb..3e10a4e6d6 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -44,7 +44,7 @@ use pageserver_api::key::{ slru_segment_size_to_key, }; use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range}; -use pageserver_api::models::{ShardImportProgress, ShardImportStatus}; +use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; use postgres_ffi::relfile_utils::parse_relfilename; @@ -74,6 +74,24 @@ pub async fn run( storage: RemoteStorageWrapper, import_progress: Option, ctx: &RequestContext, +) -> anyhow::Result<()> { + // Match how we run the import based on the progress version. + // If there's no import progress, it means that this is a new import + // and we can use whichever version we want. + match import_progress { + Some(ShardImportProgress::V1(progress)) => { + run_v1(timeline, control_file, storage, Some(progress), ctx).await + } + None => run_v1(timeline, control_file, storage, None, ctx).await, + } +} + +async fn run_v1( + timeline: Arc, + control_file: ControlFile, + storage: RemoteStorageWrapper, + import_progress: Option, + ctx: &RequestContext, ) -> anyhow::Result<()> { let planner = Planner { control_file, @@ -416,15 +434,17 @@ impl Plan { last_completed_job_idx = job_idx; if last_completed_job_idx % checkpoint_every == 0 { + let progress = ShardImportProgressV1 { + jobs: jobs_in_plan, + completed: last_completed_job_idx, + import_plan_hash, + }; + storcon_client.put_timeline_import_status( timeline.tenant_shard_id, timeline.timeline_id, timeline.generation, - ShardImportStatus::InProgress(Some(ShardImportProgress { - jobs: jobs_in_plan, - completed: last_completed_job_idx, - import_plan_hash, - })) + ShardImportStatus::InProgress(Some(ShardImportProgress::V1(progress))) ) .await .map_err(|_err| {