Perform repartitioning in compaction thread

refer #1441
This commit is contained in:
Konstantin Knizhnik
2022-03-31 16:28:07 +03:00
committed by Anastasia Lubennikova
parent 4c9447589a
commit 1f0b406b63
3 changed files with 17 additions and 11 deletions

View File

@@ -41,6 +41,7 @@ use crate::repository::{
GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter,
};
use crate::repository::{Key, Value};
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::virtual_file::VirtualFile;
use crate::walreceiver::IS_WAL_RECEIVER;
@@ -1588,6 +1589,10 @@ impl LayeredTimeline {
let target_file_size = self.conf.checkpoint_distance;
// Define partitioning schema if needed
tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)?
.repartition(self.get_last_record_lsn())?;
// 1. The partitioning was already done by the code in
// pgdatadir_mapping.rs. We just use it here.
let partitioning_guard = self.partitioning.read().unwrap();

View File

@@ -388,6 +388,17 @@ impl<R: Repository> DatadirTimeline<R> {
Ok(result.to_keyspace())
}
pub fn repartition(&self, lsn: Lsn) -> Result<()> {
let last_partitioning = self.last_partitioning.load();
if last_partitioning == Lsn(0) || lsn.0 - last_partitioning.0 > self.repartition_threshold {
let keyspace = self.collect_keyspace(lsn)?;
let partitioning = keyspace.partition(TARGET_FILE_SIZE_BYTES);
self.tline.hint_partitioning(partitioning, lsn)?;
self.last_partitioning.store(lsn);
}
Ok(())
}
}
/// DatadirModification represents an operation to ingest an atomic set of
@@ -767,7 +778,6 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
pub fn commit(self) -> Result<()> {
let writer = self.tline.tline.writer();
let last_partitioning = self.tline.last_partitioning.load();
let pending_nblocks = self.pending_nblocks;
for (key, value) in self.pending_updates {
@@ -779,15 +789,6 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
writer.finish_write(self.lsn);
if last_partitioning == Lsn(0)
|| self.lsn.0 - last_partitioning.0 > self.tline.repartition_threshold
{
let keyspace = self.tline.collect_keyspace(self.lsn)?;
let partitioning = keyspace.partition(TARGET_FILE_SIZE_BYTES);
self.tline.tline.hint_partitioning(partitioning, self.lsn)?;
self.tline.last_partitioning.store(self.lsn);
}
if pending_nblocks != 0 {
self.tline.current_logical_size.fetch_add(
pending_nblocks * pg_constants::BLCKSZ as isize,

View File

@@ -286,7 +286,7 @@ fn bootstrap_timeline<R: Repository>(
let timeline = repo.create_empty_timeline(tli, lsn)?;
let mut page_tline: DatadirTimeline<R> = DatadirTimeline::new(timeline, u64::MAX);
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &mut page_tline, lsn)?;
page_tline.tline.checkpoint(CheckpointConfig::Forced)?;
page_tline.tline.checkpoint(CheckpointConfig::Flush)?;
println!(
"created initial timeline {} timeline.lsn {}",