diff --git a/Cargo.lock b/Cargo.lock index 6df5d4a71e..f075b45e49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4331,6 +4331,7 @@ dependencies = [ "toml_edit", "tracing", "tracing-utils", + "twox-hash", "url", "utils", "uuid", diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 7e0bb7dc57..f2ba50a86f 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -305,6 +305,7 @@ impl From for tracing_utils::Protocol { pub struct TimelineImportConfig { pub import_job_concurrency: NonZeroUsize, pub import_job_soft_size_limit: NonZeroUsize, + pub import_job_checkpoint_threshold: NonZeroUsize, } pub mod statvfs { @@ -661,6 +662,7 @@ impl Default for ConfigToml { timeline_import_config: TimelineImportConfig { import_job_concurrency: NonZeroUsize::new(128).unwrap(), import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(), + import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(), }, } } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 89d531d671..58b8d80c0a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -336,14 +336,25 @@ impl TimelineCreateRequest { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub enum ShardImportStatus { - InProgress, + InProgress(Option), Done, Error(String), } + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct ShardImportProgress { + /// Total number of jobs in the import plan + pub jobs: usize, + /// Number of jobs completed + pub completed: usize, + /// Hash of the plan + pub import_plan_hash: u64, +} + impl ShardImportStatus { pub fn is_terminal(&self) -> bool { match self { - ShardImportStatus::InProgress => false, + ShardImportStatus::InProgress(_) => false, ShardImportStatus::Done | ShardImportStatus::Error(_) => true, } } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 8abd504922..b7b3e0eaf1 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -96,6 +96,7 @@ strum.workspace = true strum_macros.workspace = true wal_decoder.workspace = true smallvec.workspace = true +twox-hash.workspace = true [target.'cfg(target_os = "linux")'.dependencies] procfs.workspace = true diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index 5fac9e0ce7..602b20df97 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::{Context, bail}; +use importbucket_client::{ControlFile, RemoteStorageWrapper}; use pageserver_api::models::ShardImportStatus; use remote_storage::RemotePath; use tokio::task::JoinHandle; @@ -57,115 +58,40 @@ pub async fn doit( .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?; info!(?shard_status, "peeking shard status"); - match shard_status { - None | Some(ShardImportStatus::InProgress) => { - // TODO: checkpoint the progress into the IndexPart instead of restarting - // from the beginning. - - // - // Wipe the slate clean - the flow does not allow resuming. - // We can implement resuming in the future by checkpointing the progress into the IndexPart. - // - info!("wipe the slate clean"); - { - // TODO: do we need to hold GC lock for this? - let mut guard = timeline.layers.write().await; - assert!( - guard.layer_map()?.open_layer.is_none(), - "while importing, there should be no in-memory layer" // this just seems like a good place to assert it - ); - let all_layers_keys = guard.all_persistent_layers(); - let all_layers: Vec<_> = all_layers_keys - .iter() - .map(|key| guard.get_from_key(key)) - .collect(); - let open = guard.open_mut().context("open_mut")?; - - timeline.remote_client.schedule_gc_update(&all_layers)?; - open.finish_gc_timeline(&all_layers); - } - - // - // Wait for pgdata to finish uploading - // - info!("wait for pgdata to reach status 'done'"); + match shard_status.unwrap_or(ShardImportStatus::InProgress(None)) { + ShardImportStatus::InProgress(maybe_progress) => { let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; - let status_prefix = RemotePath::from_string("status").unwrap(); - let pgdata_status_key = status_prefix.join("pgdata"); - loop { - let res = async { - let pgdata_status: Option = storage - .get_json(&pgdata_status_key) - .await - .context("get pgdata status")?; - info!(?pgdata_status, "peeking pgdata status"); - if pgdata_status.map(|st| st.done).unwrap_or(false) { - Ok(()) - } else { - Err(anyhow::anyhow!("pgdata not done yet")) - } - } - .await; - match res { - Ok(_) => break, - Err(err) => { - info!(?err, "indefinitely waiting for pgdata to finish"); - if tokio::time::timeout( - std::time::Duration::from_secs(10), - cancel.cancelled(), - ) - .await - .is_ok() - { - bail!("cancelled while waiting for pgdata"); - } - } - } - } - // - // Do the import - // - info!("do the import"); - let control_file = storage.get_control_file().await?; - let base_lsn = control_file.base_lsn(); + let control_file_res = if maybe_progress.is_none() { + // Only prepare the import once when there's no progress. + prepare_import(timeline, storage.clone(), &cancel).await + } else { + storage.get_control_file().await + }; - info!("update TimelineMetadata based on LSNs from control file"); - { - let pg_version = control_file.pg_version(); - let _ctx: &RequestContext = ctx; - async move { - // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the - // checkpoint record, and prev_record_lsn should point to its beginning. - // We should read the real end of the record from the WAL, but here we - // just fake it. - let disk_consistent_lsn = Lsn(base_lsn.0 + 8); - let prev_record_lsn = base_lsn; - let metadata = TimelineMetadata::new( - disk_consistent_lsn, - Some(prev_record_lsn), - None, // no ancestor - Lsn(0), // no ancestor lsn - base_lsn, // latest_gc_cutoff_lsn - base_lsn, // initdb_lsn - pg_version, + let control_file = match control_file_res { + Ok(cf) => cf, + Err(err) => { + return Err( + terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await, ); - - let _start_lsn = disk_consistent_lsn + 1; - - timeline - .remote_client - .schedule_index_upload_for_full_metadata_update(&metadata)?; - - timeline.remote_client.wait_completion().await?; - - anyhow::Ok(()) } - } - .await?; + }; - flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?; + let res = flow::run( + timeline.clone(), + control_file, + storage.clone(), + maybe_progress, + ctx, + ) + .await; + if let Err(err) = res { + return Err( + terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await, + ); + } // Communicate that shard is done. // Ensure at-least-once delivery of the upcall to storage controller @@ -180,7 +106,6 @@ pub async fn doit( timeline.tenant_shard_id, timeline.timeline_id, timeline.generation, - // TODO(vlad): What about import errors? ShardImportStatus::Done, ) .await @@ -188,16 +113,151 @@ pub async fn doit( anyhow::anyhow!("Shut down while putting timeline import status") })?; } - Some(ShardImportStatus::Error(err)) => { + ShardImportStatus::Error(err) => { info!( "shard status indicates that the shard is done (error), skipping import {}", err ); } - Some(ShardImportStatus::Done) => { + ShardImportStatus::Done => { info!("shard status indicates that the shard is done (success), skipping import"); } } Ok(()) } + +async fn prepare_import( + timeline: &Arc, + storage: RemoteStorageWrapper, + cancel: &CancellationToken, +) -> anyhow::Result { + // Wipe the slate clean before starting the import as a precaution. + // This method is only called when there's no recorded checkpoint for the import + // in the storage controller. + // + // Note that this is split-brain safe (two imports for same timeline shards running in + // different generations) because we go through the usual deletion path, including deletion queue. + info!("wipe the slate clean"); + { + // TODO: do we need to hold GC lock for this? + let mut guard = timeline.layers.write().await; + assert!( + guard.layer_map()?.open_layer.is_none(), + "while importing, there should be no in-memory layer" // this just seems like a good place to assert it + ); + let all_layers_keys = guard.all_persistent_layers(); + let all_layers: Vec<_> = all_layers_keys + .iter() + .map(|key| guard.get_from_key(key)) + .collect(); + let open = guard.open_mut().context("open_mut")?; + + timeline.remote_client.schedule_gc_update(&all_layers)?; + open.finish_gc_timeline(&all_layers); + } + + // + // Wait for pgdata to finish uploading + // + info!("wait for pgdata to reach status 'done'"); + let status_prefix = RemotePath::from_string("status").unwrap(); + let pgdata_status_key = status_prefix.join("pgdata"); + loop { + let res = async { + let pgdata_status: Option = storage + .get_json(&pgdata_status_key) + .await + .context("get pgdata status")?; + info!(?pgdata_status, "peeking pgdata status"); + if pgdata_status.map(|st| st.done).unwrap_or(false) { + Ok(()) + } else { + Err(anyhow::anyhow!("pgdata not done yet")) + } + } + .await; + match res { + Ok(_) => break, + Err(err) => { + info!(?err, "indefinitely waiting for pgdata to finish"); + if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled()) + .await + .is_ok() + { + bail!("cancelled while waiting for pgdata"); + } + } + } + } + + let control_file = storage.get_control_file().await?; + let base_lsn = control_file.base_lsn(); + + info!("update TimelineMetadata based on LSNs from control file"); + { + let pg_version = control_file.pg_version(); + async move { + // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the + // checkpoint record, and prev_record_lsn should point to its beginning. + // We should read the real end of the record from the WAL, but here we + // just fake it. + let disk_consistent_lsn = Lsn(base_lsn.0 + 8); + let prev_record_lsn = base_lsn; + let metadata = TimelineMetadata::new( + disk_consistent_lsn, + Some(prev_record_lsn), + None, // no ancestor + Lsn(0), // no ancestor lsn + base_lsn, // latest_gc_cutoff_lsn + base_lsn, // initdb_lsn + pg_version, + ); + + let _start_lsn = disk_consistent_lsn + 1; + + timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata)?; + + timeline.remote_client.wait_completion().await?; + + anyhow::Ok(()) + } + } + .await?; + + Ok(control_file) +} + +async fn terminate_flow_with_error( + timeline: &Arc, + error: anyhow::Error, + storcon_client: &StorageControllerUpcallClient, + cancel: &CancellationToken, +) -> anyhow::Error { + // The import task is a aborted on tenant shutdown, so in principle, it should + // never be cancelled. To be on the safe side, check the cancellation tokens + // before marking the import as failed. + if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) { + let notify_res = storcon_client + .put_timeline_import_status( + timeline.tenant_shard_id, + timeline.timeline_id, + timeline.generation, + ShardImportStatus::Error(format!("{error:#}")), + ) + .await; + + if let Err(_notify_error) = notify_res { + // The [`StorageControllerUpcallClient::put_timeline_import_status`] retries + // forever internally, so errors returned by it can only be due to cancellation. + info!("failed to notify storcon about permanent import error"); + } + + // Will be logged by [`Tenant::create_timeline_import_pgdata_task`] + error + } else { + anyhow::anyhow!("Import task cancelled") + } +} diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 5b9c8ec5b5..c8c3bdcdfb 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -29,10 +29,11 @@ //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest) use std::collections::HashSet; +use std::hash::{Hash, Hasher}; use std::ops::Range; use std::sync::Arc; -use anyhow::{bail, ensure}; +use anyhow::ensure; use bytes::Bytes; use futures::stream::FuturesOrdered; use itertools::Itertools; @@ -43,6 +44,7 @@ use pageserver_api::key::{ slru_segment_size_to_key, }; use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range}; +use pageserver_api::models::{ShardImportProgress, ShardImportStatus}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; use postgres_ffi::relfile_utils::parse_relfilename; @@ -59,16 +61,18 @@ use super::Timeline; use super::importbucket_client::{ControlFile, RemoteStorageWrapper}; use crate::assert_u64_eq_usize::UsizeIsU64; use crate::context::{DownloadBehavior, RequestContext}; +use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient}; use crate::pgdatadir_mapping::{ DbDirectory, RelDirectory, SlruSegmentDirectory, TwoPhaseDirectory, }; use crate::task_mgr::TaskKind; -use crate::tenant::storage_layer::{ImageLayerWriter, Layer}; +use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer}; pub async fn run( timeline: Arc, control_file: ControlFile, storage: RemoteStorageWrapper, + import_progress: Option, ctx: &RequestContext, ) -> anyhow::Result<()> { let planner = Planner { @@ -81,9 +85,31 @@ pub async fn run( let import_config = &timeline.conf.timeline_import_config; let plan = planner.plan(import_config).await?; + // Hash the plan and compare with the hash of the plan we got back from the storage controller. + // If the two match, it means that the planning stage had the same output. + // + // This is not intended to be a cryptographically secure hash. + const SEED: u64 = 42; + let mut hasher = twox_hash::XxHash64::with_seed(SEED); + plan.hash(&mut hasher); + let plan_hash = hasher.finish(); + + if let Some(progress) = &import_progress { + if plan_hash != progress.import_plan_hash { + anyhow::bail!("Import plan does not match storcon metadata"); + } + + // Handle collisions on jobs of unequal length + if progress.jobs != plan.jobs.len() { + anyhow::bail!("Import plan job length does not match storcon metadata") + } + } + pausable_failpoint!("import-timeline-pre-execute-pausable"); - plan.execute(timeline, import_config, ctx).await + let start_from_job_idx = import_progress.map(|progress| progress.completed); + plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx) + .await } struct Planner { @@ -93,8 +119,11 @@ struct Planner { tasks: Vec, } +#[derive(Hash)] struct Plan { jobs: Vec, + // Included here such that it ends up in the hash for the plan + shard: ShardIdentity, } impl Planner { @@ -198,7 +227,10 @@ impl Planner { pgdata_lsn, )); - Ok(Plan { jobs }) + Ok(Plan { + jobs, + shard: self.shard, + }) } #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))] @@ -327,25 +359,45 @@ impl Plan { async fn execute( self, timeline: Arc, + start_after_job_idx: Option, + import_plan_hash: u64, import_config: &TimelineImportConfig, ctx: &RequestContext, ) -> anyhow::Result<()> { + let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &timeline.cancel); + let mut work = FuturesOrdered::new(); let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into())); let jobs_in_plan = self.jobs.len(); - let mut jobs = self.jobs.into_iter().enumerate().peekable(); - let mut results = Vec::new(); + let mut jobs = self + .jobs + .into_iter() + .enumerate() + .map(|(idx, job)| (idx + 1, job)) + .filter(|(idx, _job)| { + // Filter out any jobs that have been done already + if let Some(start_after) = start_after_job_idx { + *idx > start_after + } else { + true + } + }) + .peekable(); + + let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0); + let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into(); // Run import jobs concurrently up to the limit specified by the pageserver configuration. // Note that we process completed futures in the oreder of insertion. This will be the // building block for resuming imports across pageserver restarts or tenant migrations. - while results.len() < jobs_in_plan { + while last_completed_job_idx < jobs_in_plan { tokio::select! { permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => { let permit = permit.expect("never closed"); let (job_idx, job) = jobs.next().expect("we peeked"); + let job_timeline = timeline.clone(); let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error); @@ -357,13 +409,33 @@ impl Plan { }, maybe_complete_job_idx = work.next() => { match maybe_complete_job_idx { - Some(Ok((_job_idx, res))) => { - results.push(res); + Some(Ok((job_idx, res))) => { + assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx); + + res?; + last_completed_job_idx = job_idx; + + if last_completed_job_idx % checkpoint_every == 0 { + storcon_client.put_timeline_import_status( + timeline.tenant_shard_id, + timeline.timeline_id, + timeline.generation, + ShardImportStatus::InProgress(Some(ShardImportProgress { + jobs: jobs_in_plan, + completed: last_completed_job_idx, + import_plan_hash, + })) + ) + .await + .map_err(|_err| { + anyhow::anyhow!("Shut down while putting timeline import status") + })?; + } }, Some(Err(_)) => { - results.push(Err(anyhow::anyhow!( - "parallel job panicked or cancelled, check pageserver logs" - ))); + anyhow::bail!( + "import job panicked or cancelled" + ); } None => {} } @@ -371,17 +443,7 @@ impl Plan { } } - if results.iter().all(|r| r.is_ok()) { - Ok(()) - } else { - let mut msg = String::new(); - for result in results { - if let Err(err) = result { - msg.push_str(&format!("{err:?}\n\n")); - } - } - bail!("Some parallel jobs failed:\n\n{msg}"); - } + Ok(()) } } @@ -553,6 +615,15 @@ struct ImportSingleKeyTask { buf: Bytes, } +impl Hash for ImportSingleKeyTask { + fn hash(&self, state: &mut H) { + let ImportSingleKeyTask { key, buf } = self; + + key.hash(state); + buf.hash(state); + } +} + impl ImportSingleKeyTask { fn new(key: Key, buf: Bytes) -> Self { ImportSingleKeyTask { key, buf } @@ -581,6 +652,20 @@ struct ImportRelBlocksTask { storage: RemoteStorageWrapper, } +impl Hash for ImportRelBlocksTask { + fn hash(&self, state: &mut H) { + let ImportRelBlocksTask { + shard_identity: _, + key_range, + path, + storage: _, + } = self; + + key_range.hash(state); + path.hash(state); + } +} + impl ImportRelBlocksTask { fn new( shard_identity: ShardIdentity, @@ -665,6 +750,19 @@ struct ImportSlruBlocksTask { storage: RemoteStorageWrapper, } +impl Hash for ImportSlruBlocksTask { + fn hash(&self, state: &mut H) { + let ImportSlruBlocksTask { + key_range, + path, + storage: _, + } = self; + + key_range.hash(state); + path.hash(state); + } +} + impl ImportSlruBlocksTask { fn new(key_range: Range, path: &RemotePath, storage: RemoteStorageWrapper) -> Self { ImportSlruBlocksTask { @@ -707,6 +805,7 @@ impl ImportTask for ImportSlruBlocksTask { } } +#[derive(Hash)] enum AnyImportTask { SingleKey(ImportSingleKeyTask), RelBlocks(ImportRelBlocksTask), @@ -753,6 +852,7 @@ impl From for AnyImportTask { } } +#[derive(Hash)] struct ChunkProcessingJob { range: Range, tasks: Vec, @@ -790,17 +890,51 @@ impl ChunkProcessingJob { let resident_layer = if nimages > 0 { let (desc, path) = writer.finish(ctx).await?; + + { + let guard = timeline.layers.read().await; + let existing_layer = guard.try_get_from_key(&desc.key()); + if let Some(layer) = existing_layer { + if layer.metadata().generation != timeline.generation { + return Err(anyhow::anyhow!( + "Import attempted to rewrite layer file in the same generation: {}", + layer.local_path() + )); + } + } + } + Layer::finish_creating(timeline.conf, &timeline, desc, &path)? } else { // dropping the writer cleans up return Ok(()); }; - // this is sharing the same code as create_image_layers + // The same import job might run multiple times since not each job is checkpointed. + // Hence, we must support the cases where the layer already exists. We cannot be + // certain that the existing layer is identical to the new one, so in that case + // we replace the old layer with the one we just generated. + let mut guard = timeline.layers.write().await; - guard - .open_mut()? - .track_new_image_layers(&[resident_layer.clone()], &timeline.metrics); + + let existing_layer = guard + .try_get_from_key(&resident_layer.layer_desc().key()) + .cloned(); + match existing_layer { + Some(existing) => { + guard.open_mut()?.rewrite_layers( + &[(existing.clone(), resident_layer.clone())], + &[], + &timeline.metrics, + ); + } + None => { + guard + .open_mut()? + .track_new_image_layers(&[resident_layer.clone()], &timeline.metrics); + } + } + crate::tenant::timeline::drop_wlock(guard); timeline diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 852005639a..7e4bb627af 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4082,7 +4082,7 @@ impl Service { /// imports are stored in the database). #[instrument(skip_all, fields( tenant_id=%import.tenant_id, - shard_id=%import.timeline_id, + timeline_id=%import.timeline_id, ))] async fn finalize_timeline_import( self: &Arc, diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index 5d9d633932..909e8e2899 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -5,7 +5,7 @@ use http_utils::error::ApiError; use reqwest::Method; use serde::{Deserialize, Serialize}; -use pageserver_api::models::ShardImportStatus; +use pageserver_api::models::{ShardImportProgress, ShardImportStatus}; use tokio_util::sync::CancellationToken; use utils::{ id::{TenantId, TimelineId}, @@ -28,7 +28,12 @@ impl ShardImportStatuses { ShardImportStatuses( shards .into_iter() - .map(|ts_id| (ts_id, ShardImportStatus::InProgress)) + .map(|ts_id| { + ( + ts_id, + ShardImportStatus::InProgress(None::), + ) + }) .collect(), ) } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 2801a0e867..9d86fd027c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1255,6 +1255,12 @@ class NeonEnv: "no_sync": True, # Look for gaps in WAL received from safekeepeers "validate_wal_contiguity": True, + # TODO(vlad): make these configurable through the builder + "timeline_import_config": { + "import_job_concurrency": 4, + "import_job_soft_size_limit": 512 * 1024, + "import_job_checkpoint_threshold": 4, + }, } # Batching (https://github.com/neondatabase/neon/issues/9377):