mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
remote_timeline_client: continued integration work post ResidentLayer
This commit is contained in:
@@ -234,6 +234,7 @@ use crate::metrics::{
|
||||
use crate::task_mgr::shutdown_token;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::AsLayerDesc;
|
||||
use crate::tenant::upload_queue::Delete;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
@@ -250,7 +251,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use self::index::IndexPart;
|
||||
|
||||
use super::storage_layer::LayerFileName;
|
||||
use super::storage_layer::{LayerFileName, ResidentLayer};
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
|
||||
// Occasional network issues and such can cause remote operations to fail, and
|
||||
@@ -600,23 +601,25 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
pub fn schedule_layer_file_upload(
|
||||
self: &Arc<Self>,
|
||||
layer_file_name: &LayerFileName,
|
||||
layer: ResidentLayer,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
// FIXME: we might be still including no longer existing files in the index_part because
|
||||
// that consistency is built on strings and gentleman agreements, not Weak<LayerE> which
|
||||
// could be upgraded at the time of rendering of index_part.
|
||||
upload_queue
|
||||
.latest_files
|
||||
.insert(layer_file_name.clone(), layer_metadata.clone());
|
||||
.insert(layer.layer_desc().filename(), layer_metadata.clone());
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
|
||||
let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
|
||||
info!("scheduled layer file upload {layer}");
|
||||
let op = UploadOp::UploadLayer(layer, layer_metadata.clone());
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
|
||||
info!("scheduled layer file upload {layer_file_name}");
|
||||
|
||||
// Launch the task immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
Ok(())
|
||||
@@ -1054,11 +1057,8 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
|
||||
let path = &self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_id, &self.timeline_id)
|
||||
.join(layer_file_name.file_name());
|
||||
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
|
||||
let path = layer.local_path();
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -1367,6 +1367,7 @@ mod tests {
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
storage_layer::{LayerE, PersistentLayerDesc},
|
||||
Tenant, Timeline,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
@@ -1507,7 +1508,7 @@ mod tests {
|
||||
let TestSetup {
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
timeline: _timeline,
|
||||
timeline,
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
@@ -1542,15 +1543,56 @@ mod tests {
|
||||
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
|
||||
}
|
||||
|
||||
let layer_file_1 = LayerE::for_written(
|
||||
harness.conf,
|
||||
&timeline,
|
||||
PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_id,
|
||||
timeline.timeline_id,
|
||||
layer_file_name_1.clone(),
|
||||
content_1.len() as u64,
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// FIXME: need that api for local files
|
||||
assert!(layer_file_1.needs_download_blocking().unwrap().is_none());
|
||||
|
||||
let layer_file_2 = LayerE::for_written(
|
||||
harness.conf,
|
||||
&timeline,
|
||||
PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_id,
|
||||
timeline.timeline_id,
|
||||
layer_file_name_2.clone(),
|
||||
content_2.len() as u64,
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(layer_file_2.needs_download_blocking().unwrap().is_none());
|
||||
|
||||
let layer_file_3 = LayerE::for_written(
|
||||
harness.conf,
|
||||
&timeline,
|
||||
PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_id,
|
||||
timeline.timeline_id,
|
||||
layer_file_name_3.clone(),
|
||||
content_3.len() as u64,
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(layer_file_3.needs_download_blocking().unwrap().is_none());
|
||||
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
layer_file_1.clone(),
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_2,
|
||||
layer_file_2.clone(),
|
||||
&LayerFileMetadata::new(content_2.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -1614,7 +1656,7 @@ mod tests {
|
||||
// Schedule upload and then a deletion. Check that the deletion is queued
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_3,
|
||||
layer_file_3.clone(),
|
||||
&LayerFileMetadata::new(content_3.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
@@ -1661,7 +1703,7 @@ mod tests {
|
||||
let TestSetup {
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
timeline: _timeline,
|
||||
timeline,
|
||||
client,
|
||||
..
|
||||
} = TestSetup::new("metrics").await.unwrap();
|
||||
@@ -1681,6 +1723,21 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layer_file_1 = LayerE::for_written(
|
||||
harness.conf,
|
||||
&timeline,
|
||||
PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_id,
|
||||
timeline.timeline_id,
|
||||
layer_file_name_1.clone(),
|
||||
content_1.len() as u64,
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// FIXME: need that api for local files that actually exist
|
||||
assert!(layer_file_1.needs_download_blocking().unwrap().is_none());
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct BytesStartedFinished {
|
||||
started: Option<usize>,
|
||||
@@ -1707,7 +1764,7 @@ mod tests {
|
||||
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
layer_file_1.clone(),
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -1053,6 +1053,12 @@ impl std::fmt::Display for ResidentLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ResidentLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.owner)
|
||||
}
|
||||
}
|
||||
|
||||
impl ResidentLayer {
|
||||
pub(crate) fn local_path(&self) -> &std::path::Path {
|
||||
&self.owner.path
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::metrics::RemoteOpFileKind;
|
||||
|
||||
use super::storage_layer::LayerFileName;
|
||||
use super::storage_layer::ResidentLayer;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
@@ -210,7 +211,7 @@ pub(crate) struct Delete {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum UploadOp {
|
||||
/// Upload a layer file
|
||||
UploadLayer(LayerFileName, LayerFileMetadata),
|
||||
UploadLayer(ResidentLayer, LayerFileMetadata),
|
||||
|
||||
/// Upload the metadata file
|
||||
UploadMetadata(IndexPart, Lsn),
|
||||
@@ -225,13 +226,8 @@ pub(crate) enum UploadOp {
|
||||
impl std::fmt::Display for UploadOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
UploadOp::UploadLayer(path, metadata) => {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?})",
|
||||
path.file_name(),
|
||||
metadata.file_size()
|
||||
)
|
||||
UploadOp::UploadLayer(layer, metadata) => {
|
||||
write!(f, "UploadLayer({}, size={:?})", layer, metadata.file_size())
|
||||
}
|
||||
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
|
||||
UploadOp::Delete(delete) => write!(
|
||||
|
||||
Reference in New Issue
Block a user