Rework timeline batching

This commit is contained in:
Kirill Bulatov
2022-05-02 10:46:13 +03:00
committed by Kirill Bulatov
parent de37f982db
commit 10e4da3997
4 changed files with 292 additions and 726 deletions

View File

@@ -267,7 +267,7 @@ async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body
drop(index_accessor);
}
let new_timeline = match try_download_shard_data(state, sync_id).await {
let new_timeline = match try_download_index_part_data(state, sync_id).await {
Ok(Some(mut new_timeline)) => {
tokio::fs::create_dir_all(state.conf.timeline_path(&timeline_id, &tenant_id))
.await
@@ -300,11 +300,11 @@ async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::ACCEPTED, ())
}
async fn try_download_shard_data(
async fn try_download_index_part_data(
state: &State,
sync_id: ZTenantTimelineId,
) -> anyhow::Result<Option<RemoteTimeline>> {
let shard = match state.remote_storage.as_ref() {
let index_part = match state.remote_storage.as_ref() {
Some(GenericRemoteStorage::Local(local_storage)) => {
storage_sync::download_index_part(state.conf, local_storage, sync_id).await
}
@@ -313,18 +313,15 @@ async fn try_download_shard_data(
}
None => return Ok(None),
}
.with_context(|| format!("Failed to download index shard for timeline {}", sync_id))?;
.with_context(|| format!("Failed to download index part for timeline {sync_id}"))?;
let timeline_path = state
.conf
.timeline_path(&sync_id.timeline_id, &sync_id.tenant_id);
RemoteTimeline::from_index_part(&timeline_path, shard)
RemoteTimeline::from_index_part(&timeline_path, index_part)
.map(Some)
.with_context(|| {
format!(
"Failed to convert index shard into remote timeline for timeline {}",
sync_id
)
format!("Failed to convert index part into remote timeline for timeline {sync_id}")
})
}

View File

@@ -455,7 +455,7 @@ enum LayeredTimelineEntry {
impl LayeredTimelineEntry {
fn timeline_id(&self) -> ZTimelineId {
match self {
LayeredTimelineEntry::Loaded(timeline) => timeline.timelineid,
LayeredTimelineEntry::Loaded(timeline) => timeline.timeline_id,
LayeredTimelineEntry::Unloaded { id, .. } => *id,
}
}
@@ -615,21 +615,17 @@ impl LayeredRepository {
fn load_local_timeline(
&self,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> anyhow::Result<Arc<LayeredTimeline>> {
let metadata = load_metadata(self.conf, timelineid, self.tenant_id)
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let ancestor = metadata
.ancestor_timeline()
.map(|ancestor_timeline_id| {
trace!(
"loading {}'s ancestor {}",
timelineid,
&ancestor_timeline_id
);
trace!("loading {timeline_id}'s ancestor {}", &ancestor_timeline_id);
self.get_timeline_load_internal(ancestor_timeline_id, timelines)
})
.transpose()
@@ -643,7 +639,7 @@ impl LayeredRepository {
Arc::clone(&self.tenant_conf),
metadata,
ancestor,
timelineid,
timeline_id,
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
@@ -902,8 +898,8 @@ pub struct LayeredTimeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenantid: ZTenantId,
timelineid: ZTimelineId,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
layers: RwLock<LayerMap>,
@@ -1177,50 +1173,50 @@ impl LayeredTimeline {
tenant_conf: Arc<RwLock<TenantConfOpt>>,
metadata: TimelineMetadata,
ancestor: Option<LayeredTimelineEntry>,
timelineid: ZTimelineId,
tenantid: ZTenantId,
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
) -> LayeredTimeline {
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let flush_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"layer flush",
&tenantid.to_string(),
&timelineid.to_string(),
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let compact_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"compact",
&tenantid.to_string(),
&timelineid.to_string(),
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let create_images_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"create images",
&tenantid.to_string(),
&timelineid.to_string(),
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let wait_lsn_time_histo = WAIT_LSN_TIME
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
LayeredTimeline {
conf,
tenant_conf,
timelineid,
tenantid,
timeline_id,
tenant_id,
layers: RwLock::new(LayerMap::default()),
walredo_mgr,
@@ -1272,7 +1268,7 @@ impl LayeredTimeline {
// Scan timeline directory and create ImageFileName and DeltaFilename
// structs representing all files on disk
let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid);
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for direntry in fs::read_dir(timeline_path)? {
let direntry = direntry?;
@@ -1284,7 +1280,7 @@ impl LayeredTimeline {
if imgfilename.lsn > disk_consistent_lsn {
warn!(
"found future image layer {} on timeline {} disk_consistent_lsn is {}",
imgfilename, self.timelineid, disk_consistent_lsn
imgfilename, self.timeline_id, disk_consistent_lsn
);
rename_to_backup(direntry.path())?;
@@ -1292,7 +1288,7 @@ impl LayeredTimeline {
}
let layer =
ImageLayer::new(self.conf, self.timelineid, self.tenantid, &imgfilename);
ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, &imgfilename);
trace!("found layer {}", layer.filename().display());
layers.insert_historic(Arc::new(layer));
@@ -1307,7 +1303,7 @@ impl LayeredTimeline {
if deltafilename.lsn_range.end > disk_consistent_lsn + 1 {
warn!(
"found future delta layer {} on timeline {} disk_consistent_lsn is {}",
deltafilename, self.timelineid, disk_consistent_lsn
deltafilename, self.timeline_id, disk_consistent_lsn
);
rename_to_backup(direntry.path())?;
@@ -1315,7 +1311,7 @@ impl LayeredTimeline {
}
let layer =
DeltaLayer::new(self.conf, self.timelineid, self.tenantid, &deltafilename);
DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, &deltafilename);
trace!("found layer {}", layer.filename().display());
layers.insert_historic(Arc::new(layer));
@@ -1497,7 +1493,7 @@ impl LayeredTimeline {
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) =
cache.lookup_materialized_page(self.tenantid, self.timelineid, key, lsn)?;
cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
@@ -1509,7 +1505,7 @@ impl LayeredTimeline {
.with_context(|| {
format!(
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
self.timelineid,
self.timeline_id,
self.get_ancestor_timeline_id(),
)
})?
@@ -1517,7 +1513,7 @@ impl LayeredTimeline {
.with_context(|| {
format!(
"Ancestor timeline is not is not loaded. Timeline id: {} Ancestor id {:?}",
self.timelineid,
self.timeline_id,
self.get_ancestor_timeline_id(),
)
})?;
@@ -1554,12 +1550,12 @@ impl LayeredTimeline {
trace!(
"creating layer for write at {}/{} for record at {}",
self.timelineid,
self.timeline_id,
start_lsn,
lsn
);
let new_layer =
InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, start_lsn)?;
InMemoryLayer::create(self.conf, self.timeline_id, self.tenant_id, start_lsn)?;
let layer_rc = Arc::new(new_layer);
layers.open_layer = Some(Arc::clone(&layer_rc));
@@ -1633,8 +1629,8 @@ impl LayeredTimeline {
let self_clone = Arc::clone(self);
thread_mgr::spawn(
thread_mgr::ThreadKind::LayerFlushThread,
Some(self.tenantid),
Some(self.timelineid),
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush thread",
false,
move || self_clone.flush_frozen_layers(false),
@@ -1703,7 +1699,7 @@ impl LayeredTimeline {
// them all in parallel.
par_fsync::par_fsync(&[
new_delta_path.clone(),
self.conf.timeline_path(&self.timelineid, &self.tenantid),
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
])?;
fail_point!("checkpoint-before-sync");
@@ -1775,8 +1771,8 @@ impl LayeredTimeline {
LayeredRepository::save_metadata(
self.conf,
self.timelineid,
self.tenantid,
self.timeline_id,
self.tenant_id,
&metadata,
false,
)?;
@@ -1786,8 +1782,8 @@ impl LayeredTimeline {
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_upload(
self.tenantid,
self.timelineid,
self.tenant_id,
self.timeline_id,
HashSet::from([new_delta_path]),
Some(metadata),
);
@@ -1840,7 +1836,8 @@ impl LayeredTimeline {
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
if let Ok(pgdir) = tenant_mgr::get_local_timeline_with_load(self.tenantid, self.timelineid)
if let Ok(pgdir) =
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
{
let (partitioning, lsn) = pgdir.repartition(
self.get_last_record_lsn(),
@@ -1858,8 +1855,8 @@ impl LayeredTimeline {
}
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_upload(
self.tenantid,
self.timelineid,
self.tenant_id,
self.timeline_id,
layer_paths_to_upload,
None,
);
@@ -1909,7 +1906,7 @@ impl LayeredTimeline {
let img_range =
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
let mut image_layer_writer =
ImageLayerWriter::new(self.conf, self.timelineid, self.tenantid, &img_range, lsn)?;
ImageLayerWriter::new(self.conf, self.timeline_id, self.tenant_id, &img_range, lsn)?;
for range in &partition.ranges {
let mut key = range.start;
@@ -1932,7 +1929,7 @@ impl LayeredTimeline {
// and fsync them all in parallel.
par_fsync::par_fsync(&[
image_layer.path(),
self.conf.timeline_path(&self.timelineid, &self.tenantid),
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
])?;
// FIXME: Do we need to do something to upload it to remote storage here?
@@ -2008,8 +2005,8 @@ impl LayeredTimeline {
if writer.is_none() {
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timelineid,
self.tenantid,
self.timeline_id,
self.tenant_id,
key,
lsn_range.clone(),
)?);
@@ -2027,7 +2024,7 @@ impl LayeredTimeline {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
@@ -2057,14 +2054,14 @@ impl LayeredTimeline {
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_upload(
self.tenantid,
self.timelineid,
self.tenant_id,
self.timeline_id,
new_layer_paths,
None,
);
storage_sync::schedule_layer_delete(
self.tenantid,
self.timelineid,
self.tenant_id,
self.timeline_id,
layer_paths_do_delete,
);
}
@@ -2121,7 +2118,7 @@ impl LayeredTimeline {
let cutoff = gc_info.cutoff;
let pitr = gc_info.pitr;
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered();
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
// See branch_timeline() for details.
@@ -2254,8 +2251,8 @@ impl LayeredTimeline {
if self.upload_layers.load(atomic::Ordering::Relaxed) {
storage_sync::schedule_layer_delete(
self.tenantid,
self.timelineid,
self.tenant_id,
self.timeline_id,
layer_paths_to_delete,
);
}
@@ -2323,8 +2320,8 @@ impl LayeredTimeline {
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();
cache.memorize_materialized_page(
self.tenantid,
self.timelineid,
self.tenant_id,
self.timeline_id,
key,
last_rec_lsn,
&img,

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,7 @@ use std::{
sync::Arc,
};
use anyhow::{Context, Ok};
use anyhow::{anyhow, Context, Ok};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tokio::sync::RwLock;
@@ -113,7 +113,7 @@ impl RemoteTimelineIndex {
awaits_download: bool,
) -> anyhow::Result<()> {
self.timeline_entry_mut(id)
.ok_or_else(|| anyhow::anyhow!("unknown timeline sync {}", id))?
.ok_or_else(|| anyhow!("unknown timeline sync {id}"))?
.awaits_download = awaits_download;
Ok(())
}