Process chunks in parallel

This commit is contained in:
Stas Kelvich
2024-09-14 14:07:35 +01:00
parent 2e7e5f4f3a
commit 2bf9a350ef

View File

@@ -7,7 +7,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use itertools::Itertools;
use pageserver_api::{key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY}, reltag::RelTag};
use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, ControlFileData, BLCKSZ};
use tokio::io::AsyncRead;
use tokio::{io::AsyncRead, task::{self, JoinHandle}};
use tracing::debug;
use utils::{id::{NodeId, TenantId, TimelineId}, shard::{ShardCount, ShardNumber, TenantShardId}};
use walkdir::WalkDir;
@@ -36,7 +36,6 @@ use std::collections::HashSet;
use std::ops::Range;
pub struct PgImportEnv {
ctx: RequestContext,
conf: &'static PageServerConf,
tli: TimelineId,
tsi: TenantShardId,
@@ -51,7 +50,6 @@ pub struct PgImportEnv {
impl PgImportEnv {
pub async fn init(dstdir: &Utf8Path, tenant_id: TenantId, timeline_id: TimelineId) -> anyhow::Result<PgImportEnv> {
let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let config = toml_edit::Document::new();
let conf = PageServerConf::parse_and_validate(
NodeId(42),
@@ -67,7 +65,6 @@ impl PgImportEnv {
};
Ok(PgImportEnv {
ctx,
conf,
tli: timeline_id,
tsi,
@@ -127,24 +124,47 @@ impl PgImportEnv {
let checkpoint_buf = control_file.checkPointCopy.encode()?;
self.tasks.push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(CHECKPOINT_KEY, checkpoint_buf)));
// Execution time
// Assigns parts of key space to later parallel jobs
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
let mut current_chunk_size: usize = 0;
let mut parallel_jobs = Vec::new();
for task in std::mem::take(&mut self.tasks).into_iter() {
if current_chunk_size + task.total_size() > 1024*1024*1024 {
let key_range = last_end_key..task.key_range().start;
self.flush_chunk(key_range.clone(), std::mem::take(&mut current_chunk)).await?;
parallel_jobs.push(ChunkProcessingJob::new(
key_range.clone(),
std::mem::take(&mut current_chunk),
self
));
last_end_key = key_range.end;
current_chunk_size = 0;
}
current_chunk_size += task.total_size();
current_chunk.push(task);
}
self.flush_chunk(last_end_key..Key::NON_L0_MAX, current_chunk).await?;
parallel_jobs.push(ChunkProcessingJob::new(
last_end_key..Key::NON_L0_MAX,
current_chunk,
self
));
// should we anything about the wal?
// Start all jobs simultaneosly
// TODO: semaphore?
let mut handles = vec![];
for job in parallel_jobs {
let handle: JoinHandle<anyhow::Result<PersistentLayerDesc>> = task::spawn(async move {
let layerdesc = job.run().await?;
Ok(layerdesc)
});
handles.push(handle);
}
// Wait for all jobs to complete
for handle in handles {
let layerdesc = handle.await??;
self.layers.push(layerdesc);
}
// Create index_part.json file
self.create_index_part(&control_file).await?;
@@ -284,26 +304,6 @@ impl PgImportEnv {
Ok(())
}
async fn flush_chunk(&mut self, key_range: Range<Key>, chunk: Vec<AnyImportTask>) -> anyhow::Result<()> {
let mut layer = ImageLayerWriter::new(
&self.conf,
self.tli,
self.tsi,
&key_range,
self.pgdata_lsn,
&self.ctx,
).await?;
for task in chunk {
task.doit(&mut layer, &self.ctx).await?;
}
let layerdesc = layer.finish_raw(&self.ctx).await?;
self.layers.push(layerdesc);
Ok(())
}
}
//
@@ -593,3 +593,58 @@ impl From<ImportSlruBlocksTask> for AnyImportTask {
Self::SlruBlocks(t)
}
}
struct ChunkProcessingJob {
range: Range<Key>,
tasks: Vec<AnyImportTask>,
dstdir: Utf8PathBuf,
tenant_id: TenantId,
timeline_id: TimelineId,
pgdata_lsn: Lsn,
}
impl ChunkProcessingJob {
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &PgImportEnv) -> Self {
assert!(env.pgdata_lsn.is_valid());
Self {
range,
tasks,
dstdir: env.conf.workdir.clone(),
tenant_id: env.tsi.tenant_id,
timeline_id: env.tli,
pgdata_lsn: env.pgdata_lsn,
}
}
async fn run(self) -> anyhow::Result<PersistentLayerDesc> {
let ctx: RequestContext = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let config = toml_edit::Document::new();
let conf: &'static PageServerConf = Box::leak(Box::new(PageServerConf::parse_and_validate(
NodeId(42),
&config,
&self.dstdir
)?));
let tsi = TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
};
let mut layer = ImageLayerWriter::new(
&conf,
self.timeline_id,
tsi,
&self.range,
self.pgdata_lsn,
&ctx,
).await?;
for task in self.tasks {
task.doit(&mut layer, &ctx).await?;
}
let layerdesc = layer.finish_raw(&ctx).await?;
Ok(layerdesc)
}
}