diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 9743aa3f26..bf3c7eeda6 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -11,19 +11,7 @@ //! - => S3 as the source for the PGDATA instead of local filesystem //! //! TODOs before productionization: -//! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding. -//! => produced image layers likely too small. //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size. -//! - asserts / unwraps need to be replaced with errors -//! - don't trust remote objects will be small (=prevent OOMs in those cases) -//! - limit all in-memory buffers in size, or download to disk and read from there -//! - limit task concurrency -//! - generally play nice with other tenants in the system -//! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits -//! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc -//! - integrate with layer eviction system -//! - audit for Tenant::cancel nor Timeline::cancel responsivity -//! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!) //! //! An incomplete set of TODOs from the Hackathon: //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest) @@ -44,7 +32,7 @@ use pageserver_api::key::{ rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, }; -use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range}; +use pageserver_api::keyspace::{ShardedRange, singleton_range}; use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus}; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; @@ -167,6 +155,7 @@ impl Planner { /// This function is and must remain pure: given the same input, it will generate the same import plan. async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result { let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align(); + anyhow::ensure!(pgdata_lsn.is_valid()); let datadir = PgDataDir::new(&self.storage).await?; @@ -249,14 +238,22 @@ impl Planner { }); // Assigns parts of key space to later parallel jobs + // Note: The image layers produced here may have gaps, meaning, + // there is not an image for each key in the layer's key range. + // The read path stops traversal at the first image layer, regardless + // of whether a base image has been found for a key or not. + // (Concept of sparse image layers doesn't exist.) + // This behavior is exactly right for the base image layers we're producing here. + // But, since no other place in the code currently produces image layers with gaps, + // it seems noteworthy. let mut last_end_key = Key::MIN; let mut current_chunk = Vec::new(); let mut current_chunk_size: usize = 0; let mut jobs = Vec::new(); for task in std::mem::take(&mut self.tasks).into_iter() { - if current_chunk_size + task.total_size() - > import_config.import_job_soft_size_limit.into() - { + let task_size = task.total_size(&self.shard); + let projected_chunk_size = current_chunk_size.saturating_add(task_size); + if projected_chunk_size > import_config.import_job_soft_size_limit.into() { let key_range = last_end_key..task.key_range().start; jobs.push(ChunkProcessingJob::new( key_range.clone(), @@ -266,7 +263,7 @@ impl Planner { last_end_key = key_range.end; current_chunk_size = 0; } - current_chunk_size += task.total_size(); + current_chunk_size = current_chunk_size.saturating_add(task_size); current_chunk.push(task); } jobs.push(ChunkProcessingJob::new( @@ -604,18 +601,18 @@ impl PgDataDirDb { }; let path = datadir_path.join(rel_tag.to_segfile_name(segno)); - assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error + anyhow::ensure!(filesize % BLCKSZ as usize == 0); let nblocks = filesize / BLCKSZ as usize; - PgDataDirDbFile { + Ok(PgDataDirDbFile { path, filesize, rel_tag, segno, nblocks: Some(nblocks), // first non-cummulative sizes - } + }) }) - .collect(); + .collect::>()?; // Set cummulative sizes. Do all of that math here, so that later we could easier // parallelize over segments and know with which segments we need to write relsize @@ -650,12 +647,22 @@ impl PgDataDirDb { trait ImportTask { fn key_range(&self) -> Range; - fn total_size(&self) -> usize { - // TODO: revisit this - if is_contiguous_range(&self.key_range()) { - contiguous_range_len(&self.key_range()) as usize * 8192 + fn total_size(&self, shard_identity: &ShardIdentity) -> usize { + let range = ShardedRange::new(self.key_range(), shard_identity); + let page_count = range.page_count(); + if page_count == u32::MAX { + tracing::warn!( + "Import task has non contiguous key range: {}..{}", + self.key_range().start, + self.key_range().end + ); + + // Tasks should operate on contiguous ranges. It is unexpected for + // ranges to violate this assumption. Calling code handles this by mapping + // any task on a non contiguous range to its own image layer. + usize::MAX } else { - u32::MAX as usize + page_count as usize * 8192 } } @@ -753,6 +760,8 @@ impl ImportTask for ImportRelBlocksTask { layer_writer: &mut ImageLayerWriter, ctx: &RequestContext, ) -> anyhow::Result { + const MAX_BYTE_RANGE_SIZE: usize = 128 * 1024 * 1024; + debug!("Importing relation file"); let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?; @@ -777,7 +786,7 @@ impl ImportTask for ImportRelBlocksTask { assert_eq!(key.len(), 1); assert!(!acc.is_empty()); assert!(acc_end > acc_start); - if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ { + if acc_end == start && end - acc_start <= MAX_BYTE_RANGE_SIZE { acc.push(key.pop().unwrap()); Ok((acc, acc_start, end)) } else { @@ -792,8 +801,8 @@ impl ImportTask for ImportRelBlocksTask { .get_range(&self.path, range_start.into_u64(), range_end.into_u64()) .await?; let mut buf = Bytes::from(range_buf); - // TODO: batched writes for key in keys { + // The writer buffers writes internally let image = buf.split_to(8192); layer_writer.put_image(key, image, ctx).await?; nimages += 1; @@ -846,6 +855,9 @@ impl ImportTask for ImportSlruBlocksTask { debug!("Importing SLRU segment file {}", self.path); let buf = self.storage.get(&self.path).await?; + // TODO(vlad): Does timestamp to LSN work for imported timelines? + // Probably not since we don't append the `xact_time` to it as in + // [`WalIngest::ingest_xact_record`]. let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?; let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?; let mut blknum = start_blk; diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs index 34313748b7..bf2d9875c1 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use postgres_ffi::ControlFileData; use remote_storage::{ Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing, - ListingObject, RemotePath, + ListingObject, RemotePath, RemoteStorageConfig, }; use serde::de::DeserializeOwned; use tokio_util::sync::CancellationToken; @@ -22,11 +22,9 @@ pub async fn new( location: &index_part_format::Location, cancel: CancellationToken, ) -> Result { - // FIXME: we probably want some timeout, and we might be able to assume the max file - // size on S3 is 1GiB (postgres segment size). But the problem is that the individual - // downloaders don't know enough about concurrent downloads to make a guess on the - // expected bandwidth and resulting best timeout. - let timeout = std::time::Duration::from_secs(24 * 60 * 60); + // Downloads should be reasonably sized. We do ranged reads for relblock raw data + // and full reads for SLRU segments which are bounded by Postgres. + let timeout = RemoteStorageConfig::DEFAULT_TIMEOUT; let location_storage = match location { #[cfg(feature = "testing")] index_part_format::Location::LocalFs { path } => { @@ -50,9 +48,12 @@ pub async fn new( .import_pgdata_aws_endpoint_url .clone() .map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env - concurrency_limit: 100.try_into().unwrap(), // TODO: think about this - max_keys_per_list_response: Some(1000), // TODO: think about this - upload_storage_class: None, // irrelevant + // This matches the default import job concurrency. This is managed + // separately from the usual S3 client, but the concern here is bandwidth + // usage. + concurrency_limit: 128.try_into().unwrap(), + max_keys_per_list_response: Some(1000), + upload_storage_class: None, // irrelevant }, timeout, )