mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
create_image_layers integration (multifile)
This commit is contained in:
@@ -33,6 +33,7 @@ use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirectio
|
||||
use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
@@ -47,6 +48,7 @@ use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tracing::*;
|
||||
|
||||
@@ -57,7 +59,10 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::ImageFileName;
|
||||
use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLayerDesc};
|
||||
use super::{
|
||||
AsLayerDesc, Layer, LayerAccessStatsReset, LayerE, PathOrConf, PersistentLayerDesc,
|
||||
ResidentLayer,
|
||||
};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -582,7 +587,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
fn finish(self) -> anyhow::Result<ImageLayer> {
|
||||
fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -624,13 +629,6 @@ impl ImageLayerWriterInner {
|
||||
// Note: Because we open the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
let layer = ImageLayer {
|
||||
path_or_conf: PathOrConf::Conf(self.conf),
|
||||
desc,
|
||||
lsn: self.lsn,
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: OnceCell::new(),
|
||||
};
|
||||
|
||||
// fsync the file
|
||||
file.sync_all()?;
|
||||
@@ -650,7 +648,9 @@ impl ImageLayerWriterInner {
|
||||
);
|
||||
std::fs::rename(self.path, final_path)?;
|
||||
|
||||
trace!("created image layer {}", layer.path().display());
|
||||
let layer = LayerE::for_written(self.conf, timeline, desc)?;
|
||||
|
||||
trace!("created image layer {}", layer.local_path().display());
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
@@ -716,8 +716,11 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
|
||||
self.inner.take().unwrap().finish()
|
||||
pub(crate) fn finish(
|
||||
mut self,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> anyhow::Result<super::ResidentLayer> {
|
||||
self.inner.take().unwrap().finish(timeline)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ use super::config::TenantConf;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc, ResidentLayer,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -813,13 +813,15 @@ impl Timeline {
|
||||
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self
|
||||
let layers = self
|
||||
.create_image_layers(&partitioning, lsn, false, &image_ctx)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for (path, layer_metadata) in layer_paths_to_upload {
|
||||
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
|
||||
for layer in layers {
|
||||
let m = LayerFileMetadata::new(layer.layer_desc().file_size);
|
||||
|
||||
remote_client.schedule_layer_file_upload(layer, &m)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2922,14 +2924,14 @@ impl Timeline {
|
||||
}
|
||||
|
||||
async fn create_image_layers(
|
||||
&self,
|
||||
self: &Arc<Timeline>,
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashMap<Arc<LayerE>, LayerFileMetadata>, PageReconstructError> {
|
||||
) -> Result<Vec<ResidentLayer>, PageReconstructError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers: Vec<ImageLayer> = Vec::new();
|
||||
let mut image_layers = Vec::new();
|
||||
|
||||
// We need to avoid holes between generated image layers.
|
||||
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
|
||||
@@ -2992,7 +2994,7 @@ impl Timeline {
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
let image_layer = image_layer_writer.finish()?;
|
||||
let image_layer = image_layer_writer.finish(self)?;
|
||||
image_layers.push(image_layer);
|
||||
}
|
||||
}
|
||||
@@ -3014,7 +3016,7 @@ impl Timeline {
|
||||
// and fsync them all in parallel.
|
||||
let all_paths = image_layers
|
||||
.iter()
|
||||
.map(|layer| layer.path())
|
||||
.map(|layer| layer.local_path().to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
par_fsync::par_fsync_async(&all_paths)
|
||||
@@ -3025,34 +3027,23 @@ impl Timeline {
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
|
||||
|
||||
for l in &image_layers {
|
||||
let path = l.filename();
|
||||
let metadata = timeline_path
|
||||
.join(path.file_name())
|
||||
.metadata()
|
||||
.with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
|
||||
|
||||
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
|
||||
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
.add(l.layer_desc().file_size);
|
||||
let l = Arc::new(l);
|
||||
l.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
}
|
||||
guard.track_new_image_layers(image_layers);
|
||||
guard.track_new_image_layers(&image_layers);
|
||||
drop_wlock(guard);
|
||||
timer.stop_and_record();
|
||||
|
||||
Ok(layer_paths_to_upload)
|
||||
Ok(image_layers)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::{
|
||||
layer_map::{BatchedUpdates, LayerMap},
|
||||
storage_layer::{
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, LayerE, PersistentLayer,
|
||||
PersistentLayerDesc, PersistentLayerKey,
|
||||
PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
|
||||
},
|
||||
timeline::compare_arced_layers,
|
||||
},
|
||||
@@ -164,10 +164,10 @@ impl LayerManager {
|
||||
}
|
||||
|
||||
/// Add image layers to the layer map, called from `create_image_layers`.
|
||||
pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
|
||||
pub(crate) fn track_new_image_layers(&mut self, image_layers: &[ResidentLayer]) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for layer in image_layers {
|
||||
Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
|
||||
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user