Compare commits

..

1 Commits

Author SHA1 Message Date
Joonas Koivunen
f55baa66e8 feat: noisier easier to debug completion 2023-08-07 10:59:44 +03:00
24 changed files with 386 additions and 556 deletions

View File

@@ -10,7 +10,6 @@ use anyhow::Context;
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
};
use aws_credential_types::cache::CredentialsCache;
use aws_sdk_s3::{
@@ -68,29 +67,18 @@ impl S3Bucket {
aws_config.bucket_name
);
let region = Some(Region::new(aws_config.bucket_region.clone()));
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
)
// uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
// needed to access remote extensions bucket
.or_else("token", {
let provider_conf = ProviderConfig::without_region().with_region(region.clone());
WebIdentityTokenCredentialsProvider::builder()
.configure(&provider_conf)
.build()
})
// uses imds v2
.or_else("imds", ImdsCredentialsProvider::builder().build())
};
let mut config_builder = Config::builder()
.region(region)
.region(Region::new(aws_config.bucket_region.clone()))
.credentials_cache(CredentialsCache::lazy())
.credentials_provider(credentials_provider);

View File

@@ -1,16 +1,57 @@
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
pub struct Completion(Arc<Shared>);
impl Clone for Completion {
#[track_caller]
fn clone(&self) -> Self {
let in_progress = self
.0
.completions_in_progress
.fetch_add(1, Ordering::Release);
assert!(in_progress < usize::MAX - 1);
let waiters = Arc::strong_count(&self.0)
.checked_sub(in_progress)
.unwrap_or(0);
let id = self.0.id;
let location = std::panic::Location::caller();
tracing::info!(id, waiters, in_progress, %location, "cloning");
Self(self.0.clone())
}
}
impl Drop for Completion {
fn drop(&mut self) {
let now = self
.0
.completions_in_progress
.fetch_update(Ordering::Release, Ordering::Relaxed, |x| x.checked_sub(1))
.expect("should not have underflown");
if now == 0 {
let count = Arc::strong_count(&self.0);
let id = self.0.id;
tracing::info!(id, waiters = count - 1, "notifying waiters");
self.0.notify.notify_waiters();
}
}
}
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
pub struct Barrier(Arc<Shared>);
struct Shared {
id: usize,
notify: tokio::sync::Notify,
completions_in_progress: AtomicUsize,
}
impl Default for Barrier {
fn default() -> Self {
@@ -21,7 +62,27 @@ impl Default for Barrier {
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
loop {
let in_progress = self.0.completions_in_progress.load(Ordering::Acquire);
if in_progress == 0 {
tracing::info!(id = self.0.id, "wait complete!");
break;
} else {
let waiters = Arc::strong_count(&self.0)
.checked_sub(in_progress)
// there might be drift between the two, but we are still waiting
.unwrap_or(1);
tracing::info!(id = self.0.id, waiters, in_progress, "waiting");
drop(
tokio::time::timeout(
std::time::Duration::from_millis(100),
self.0.notify.notified(),
)
.await,
);
}
}
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
@@ -41,9 +102,19 @@ impl PartialEq for Barrier {
impl Eq for Barrier {}
/// Create new Guard and Barrier pair.
#[track_caller]
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
let shared = Arc::new(Shared {
id: ID_COUNTER.fetch_add(1, Ordering::Relaxed),
notify: Default::default(),
completions_in_progress: AtomicUsize::new(1),
});
let location = std::panic::Location::caller();
tracing::info!(id = shared.id, %location, "created");
(Completion(shared.clone()), Barrier(shared))
}

View File

@@ -23,7 +23,6 @@
//! <https://grafana.com/tutorials/build-a-panel-plugin/>
use anyhow::Result;
use pageserver::repository::Key;
use pageserver::METADATA_FILE_NAME;
use std::cmp::Ordering;
use std::io::{self, BufRead};
use std::path::PathBuf;
@@ -72,10 +71,6 @@ pub fn main() -> Result<()> {
let line = PathBuf::from_str(&line).unwrap();
let filename = line.file_name().unwrap();
let filename = filename.to_str().unwrap();
if filename == METADATA_FILE_NAME {
// Don't try and parse "metadata" like a key-lsn range
continue;
}
let range = parse_filename(filename);
ranges.push(range);
}

View File

@@ -9,10 +9,8 @@ use clap::{Arg, ArgAction, Command};
use fail::FailScenario;
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use remote_storage::GenericRemoteStorage;
use tokio::time::Instant;
use tracing::*;
use metrics::set_build_info_metric;
@@ -226,19 +224,6 @@ fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
) -> anyhow::Result<()> {
// Monotonic time for later calculating startup duration
let started_startup_at = Instant::now();
let startup_checkpoint = move |phase: &str, human_phase: &str| {
let elapsed = started_startup_at.elapsed();
let secs = elapsed.as_secs_f64();
STARTUP_DURATION.with_label_values(&[phase]).set(secs);
info!(
elapsed_ms = elapsed.as_millis(),
"{human_phase} ({secs:.3}s since start)"
)
};
// Print version and launch timestamp to the log,
// and expose them as prometheus metrics.
// A changed version string indicates changed software.
@@ -348,11 +333,6 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// Up to this point no significant I/O has been done: this should have been fast. Record
// duration prior to starting I/O intensive phase of startup.
startup_checkpoint("initial", "Starting loading tenants");
STARTUP_IS_LOADING.set(1);
// Startup staging or optimizing:
//
// We want to minimize downtime for `page_service` connections, and trying not to overload
@@ -378,6 +358,7 @@ fn start_pageserver(
};
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
@@ -395,13 +376,18 @@ fn start_pageserver(
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed"));
init_done_rx.wait().await;
startup_checkpoint("initial_tenant_load", "Initial load completed");
STARTUP_IS_LOADING.set(0);
// initial logical sizes can now start, as they were waiting on init_done_rx.
scopeguard::ScopeGuard::into_inner(guard);
let init_done = std::time::Instant::now();
let elapsed = init_done - init_started_at;
tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed"
);
let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait());
let timeout = conf.background_task_maximum_delay;
@@ -410,7 +396,12 @@ fn start_pageserver(
let init_sizes_done = match tokio::time::timeout(timeout, &mut init_sizes_done).await {
Ok(_) => {
startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed");
let now = std::time::Instant::now();
tracing::info!(
from_init_done_millis = (now - init_done).as_millis(),
from_init_millis = (now - init_started_at).as_millis(),
"Initial logical sizes completed"
);
None
}
Err(_) => {
@@ -426,7 +417,6 @@ fn start_pageserver(
// allow background jobs to start
drop(background_jobs_can_start);
startup_checkpoint("background_jobs_can_start", "Starting background jobs");
if let Some(init_sizes_done) = init_sizes_done {
// ending up here is not a bug; at the latest logical sizes will be queried by
@@ -436,11 +426,14 @@ fn start_pageserver(
scopeguard::ScopeGuard::into_inner(guard);
startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed after timeout (background jobs already started)");
let now = std::time::Instant::now();
tracing::info!(
from_init_done_millis = (now - init_done).as_millis(),
from_init_millis = (now - init_started_at).as_millis(),
"Initial logical sizes completed after timeout (background jobs already started)"
);
}
startup_checkpoint("complete", "Startup complete");
};
async move {

View File

@@ -7,7 +7,7 @@ pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub mod keyspace;
pub mod metrics;
pub(crate) mod metrics;
pub mod page_cache;
pub mod page_service;
pub mod pgdatadir_mapping;

View File

@@ -1,9 +1,9 @@
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
register_uint_gauge, register_uint_gauge_vec, Counter, CounterVec, GaugeVec, Histogram,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge,
register_uint_gauge_vec, Counter, CounterVec, Histogram, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use strum::VariantNames;
@@ -394,35 +394,6 @@ pub(crate) static UNEXPECTED_ONDEMAND_DOWNLOADS: Lazy<IntCounter> = Lazy::new(||
.expect("failed to define a metric")
});
/// How long did we take to start up? Broken down by labels to describe
/// different phases of startup.
pub static STARTUP_DURATION: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_startup_duration_seconds",
"Time taken by phases of pageserver startup, in seconds",
&["phase"]
)
.expect("Failed to register pageserver_startup_duration_seconds metric")
});
pub static STARTUP_IS_LOADING: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_startup_is_loading",
"1 while in initial startup load of tenants, 0 at other times"
)
.expect("Failed to register pageserver_startup_is_loading")
});
/// How long did tenants take to go from construction to active state?
pub(crate) static TENANT_ACTIVATION: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_tenant_activation_seconds",
"Time taken by tenants to activate, in seconds",
CRITICAL_OP_BUCKETS.into()
)
.expect("Failed to register pageserver_tenant_activation_seconds metric")
});
/// Each `Timeline`'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
#[derive(Debug)]
pub struct EvictionsWithLowResidenceDuration {

View File

@@ -56,7 +56,6 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::metrics::TENANT_ACTIVATION;
use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
use crate::repository::GcResult;
use crate::task_mgr;
@@ -600,7 +599,10 @@ impl Tenant {
debug!("successfully downloaded index part for timeline {timeline_id}");
match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => {
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
timeline_ancestors.insert(
timeline_id,
index_part.parse_metadata().context("parse_metadata")?,
);
remote_index_and_client.insert(timeline_id, (index_part, client));
}
MaybeDeletedIndexPart::Deleted(_) => {
@@ -1125,7 +1127,10 @@ impl Tenant {
}
};
let remote_metadata = index_part.metadata.clone();
let remote_metadata = index_part
.parse_metadata()
.context("parse_metadata")
.map_err(LoadLocalTimelineError::Load)?;
(
Some(RemoteStartupData {
index_part,
@@ -1634,8 +1639,6 @@ impl Tenant {
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
TENANT_ACTIVATION.observe(elapsed.as_secs_f64());
});
}
}

View File

@@ -12,7 +12,7 @@ use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use anyhow::{bail, ensure, Context};
use serde::{de::Error, Deserialize, Serialize, Serializer};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::info_span;
use utils::bin_ser::SerializeError;
@@ -232,28 +232,6 @@ impl TimelineMetadata {
}
}
impl<'de> Deserialize<'de> for TimelineMetadata {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let bytes = Vec::<u8>::deserialize(deserializer)?;
Self::from_bytes(bytes.as_slice()).map_err(|e| D::Error::custom(format!("{}", e)))
}
}
impl Serialize for TimelineMetadata {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let bytes = self
.to_bytes()
.map_err(|e| serde::ser::Error::custom(format!("{}", e)))?;
bytes.serialize(serializer)
}
}
/// Save timeline metadata to file
pub fn save_metadata(
conf: &'static PageServerConf,

View File

@@ -534,7 +534,8 @@ impl RemoteTimelineClient {
// ahead of what's _actually_ on the remote during index upload.
upload_queue.latest_metadata = metadata.clone();
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
self.schedule_index_upload(upload_queue, metadata_bytes);
Ok(())
}
@@ -554,7 +555,8 @@ impl RemoteTimelineClient {
let upload_queue = guard.initialized_mut()?;
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
self.schedule_index_upload(upload_queue, metadata_bytes);
}
Ok(())
@@ -564,7 +566,7 @@ impl RemoteTimelineClient {
fn schedule_index_upload(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
metadata: TimelineMetadata,
metadata_bytes: Vec<u8>,
) {
info!(
"scheduling metadata upload with {} files ({} changed)",
@@ -574,7 +576,11 @@ impl RemoteTimelineClient {
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
let index_part = IndexPart::new(upload_queue.latest_files.clone(), metadata);
let index_part = IndexPart::new(
upload_queue.latest_files.clone(),
disk_consistent_lsn,
metadata_bytes,
);
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
@@ -629,7 +635,7 @@ impl RemoteTimelineClient {
// Deleting layers doesn't affect the values stored in TimelineMetadata,
// so we don't need update it. Just serialize it.
let metadata = upload_queue.latest_metadata.clone();
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
// Update the remote index file, removing the to-be-deleted files from the index,
// before deleting the actual files.
@@ -645,7 +651,7 @@ impl RemoteTimelineClient {
}
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
self.schedule_index_upload(upload_queue, metadata);
self.schedule_index_upload(upload_queue, metadata_bytes);
}
// schedule the actual deletions
@@ -1536,17 +1542,14 @@ mod tests {
};
assert_file_list(
&index_part
.layer_metadata
.keys()
.map(|f| f.to_owned())
.collect(),
&index_part.timeline_layers,
&[
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
],
);
assert_eq!(index_part.metadata, metadata);
let downloaded_metadata = index_part.parse_metadata()?;
assert_eq!(downloaded_metadata, metadata);
// Schedule upload and then a deletion. Check that the deletion is queued
let content_baz = dummy_contents("baz");

View File

@@ -259,19 +259,13 @@ pub(super) async fn download_index_part(
)
.await?;
let decode_result = serde_json::from_slice::<IndexPart>(&index_part_bytes)
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| {
format!("Failed to deserialize index part file into file {index_part_path:?}")
})
.map_err(DownloadError::Other);
.map_err(DownloadError::Other)?;
// Peek at the result, and log the original bytes if they failed to decode
if decode_result.is_err() {
let index_str = String::from_utf8_lossy(index_part_bytes.as_slice());
warn!("Corrupt index bytes: {index_str}");
}
decode_result
Ok(index_part)
}
///

View File

@@ -5,15 +5,16 @@
use std::collections::{HashMap, HashSet};
use chrono::NaiveDateTime;
use serde::ser::SerializeStruct;
use serde::{Deserialize, Serialize, Serializer};
use serde_with::serde_as;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::upload_queue::UploadQueueInitialized;
use utils::lsn::Lsn;
/// Metadata gathered for each of the layer files.
///
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
@@ -50,82 +51,33 @@ impl LayerFileMetadata {
///
/// This type needs to be backwards and forwards compatible. When changing the fields,
/// remember to add a test case for the changed version.
#[derive(Debug, PartialEq, Eq, Clone)]
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexPart {
/// Debugging aid describing the version of this type.
#[serde(default)]
version: usize,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<NaiveDateTime>,
/// Layer names, which are stored on the remote storage.
///
/// Additional metadata can might exist in `layer_metadata`.
pub timeline_layers: HashSet<LayerFileName>,
/// Per layer file name metadata, which can be present for a present or missing layer file.
///
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
/// that latest version stores.
pub layer_metadata: HashMap<LayerFileName, IndexLayerMetadata>,
pub metadata: TimelineMetadata,
}
impl<'de> Deserialize<'de> for IndexPart {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
// Declaring a struct is simpler that implementing a Visitor to handle decoding
// a JSON struct while ignoring fields we don't care about.
#[serde_as]
#[derive(Deserialize)]
struct SerializedIndexPart {
#[serde(default)]
version: usize,
#[serde(default)]
deleted_at: Option<NaiveDateTime>,
layer_metadata: HashMap<LayerFileName, IndexLayerMetadata>,
metadata_bytes: TimelineMetadata,
}
let inner = SerializedIndexPart::deserialize(deserializer)?;
Ok(IndexPart {
version: inner.version,
deleted_at: inner.deleted_at,
layer_metadata: inner.layer_metadata,
metadata: inner.metadata_bytes,
})
}
}
impl Serialize for IndexPart {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("IndexPart", 6)?;
state.serialize_field("version", &(self.version as u32))?;
// Forward compat: write out this field only so that v2 readers can read
// the v3 structure. This could be written more efficiently but this forward
// compat code will go away in the near future.
let timeline_layers: HashSet<LayerFileName> =
self.layer_metadata.keys().map(|k| k.to_owned()).collect();
state.serialize_field("timeline_layers", &timeline_layers)?;
state.serialize_field("deleted_at", &self.deleted_at)?;
state.serialize_field("layer_metadata", &self.layer_metadata)?;
let metadata_bytes = self.metadata.to_bytes().map_err(|e| {
serde::ser::Error::custom(format!("Unserializable IndexPart metadata: {e}"))
})?;
state.serialize_field("metadata_bytes", &metadata_bytes)?;
// This field is written out for convenience of human readers, but is
// not read back in deserialization
state.serialize_field(
"disk_consistent_lsn",
&format!("{}", self.metadata.disk_consistent_lsn()),
)?;
state.end()
}
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
// It's duplicated here for convenience.
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
metadata_bytes: Vec<u8>,
}
impl IndexPart {
@@ -138,30 +90,44 @@ impl IndexPart {
pub fn new(
layers_and_metadata: HashMap<LayerFileName, LayerFileMetadata>,
metadata: TimelineMetadata,
disk_consistent_lsn: Lsn,
metadata_bytes: Vec<u8>,
) -> Self {
let mut timeline_layers = HashSet::with_capacity(layers_and_metadata.len());
let mut layer_metadata = HashMap::with_capacity(layers_and_metadata.len());
for (remote_name, metadata) in layers_and_metadata {
layer_metadata.insert(remote_name.to_owned(), IndexLayerMetadata::from(metadata));
for (remote_name, metadata) in &layers_and_metadata {
timeline_layers.insert(remote_name.to_owned());
let metadata = IndexLayerMetadata::from(metadata);
layer_metadata.insert(remote_name.to_owned(), metadata);
}
Self {
version: Self::LATEST_VERSION,
timeline_layers,
layer_metadata,
metadata,
disk_consistent_lsn,
metadata_bytes,
deleted_at: None,
}
}
pub fn parse_metadata(&self) -> anyhow::Result<TimelineMetadata> {
TimelineMetadata::from_bytes(&self.metadata_bytes)
}
}
impl TryFrom<&UploadQueueInitialized> for IndexPart {
type Error = SerializeError;
fn try_from(upload_queue: &UploadQueueInitialized) -> Result<Self, Self::Error> {
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
Ok(Self::new(
upload_queue.latest_files.clone(),
upload_queue.latest_metadata.clone(),
disk_consistent_lsn,
metadata_bytes,
))
}
}
@@ -172,8 +138,8 @@ pub struct IndexLayerMetadata {
pub(super) file_size: u64,
}
impl From<LayerFileMetadata> for IndexLayerMetadata {
fn from(other: LayerFileMetadata) -> Self {
impl From<&'_ LayerFileMetadata> for IndexLayerMetadata {
fn from(other: &'_ LayerFileMetadata) -> Self {
IndexLayerMetadata {
file_size: other.file_size,
}
@@ -186,48 +152,21 @@ mod tests {
#[test]
fn v1_indexpart_is_parsed() {
let metadata_bytes: Vec<u8> = [
113, 11, 159, 210, 0, 54, 0, 4, 0, 0, 0, 0, 1, 105, 96, 232, 1, 0, 0, 0, 0, 1, 105, 96,
112, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 105, 96, 112, 0, 0, 0, 0, 1, 105, 96,
112, 0, 0, 0, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]
.to_vec();
let metadata_bytes_str = serde_json::to_string(&metadata_bytes).unwrap();
let example = format!(
r#"{{
let example = r#"{
"version":1,
"timeline_layers":[
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9",
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51"
],
"layer_metadata":{{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": {{ "file_size": 25600000 }},
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": {{ "file_size": 9007199254741001 }}
}},
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata_bytes":{metadata_bytes_str}
}}"#
);
"metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
}"#;
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
@@ -238,61 +177,71 @@ mod tests {
file_size: 9007199254741001,
})
]),
metadata: TimelineMetadata::from_bytes(&metadata_bytes).unwrap(),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(&example).unwrap();
let part = serde_json::from_str::<IndexPart>(example).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v1_indexpart_is_parsed_with_optional_missing_layers() {
let example = r#"{
"version":1,
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
"missing_layers":["This shouldn't fail deserialization"],
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata_bytes":[112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
}"#;
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v2_indexpart_is_parsed_with_deleted_at() {
let metadata_bytes: Vec<u8> = [
136, 151, 49, 208, 0, 70, 0, 4, 0, 0, 0, 0, 2, 83, 38, 72, 1, 0, 0, 0, 0, 2, 83, 38,
32, 1, 87, 198, 240, 135, 97, 119, 45, 125, 38, 29, 155, 161, 140, 141, 255, 210, 0, 0,
0, 0, 2, 83, 38, 72, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0,
15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
]
.to_vec();
let metadata_bytes_str = serde_json::to_string(&metadata_bytes).unwrap();
let example = format!(
r#"{{
let example = r#"{
"version":2,
"timeline_layers":[
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9",
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51"
],
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
"missing_layers":["This shouldn't fail deserialization"],
"layer_metadata":{{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": {{ "file_size": 25600000 }},
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": {{ "file_size": 9007199254741001 }}
}},
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata_bytes":{metadata_bytes_str},
"metadata_bytes":[112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
"deleted_at": "2023-07-31T09:00:00.123"
}}"#
);
}"#;
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 2,
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
@@ -303,131 +252,58 @@ mod tests {
file_size: 9007199254741001,
})
]),
metadata: TimelineMetadata::from_bytes(&metadata_bytes).unwrap(),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};
let part = serde_json::from_str::<IndexPart>(&example).unwrap();
assert_eq!(part, expected);
// Validate that when we write out, we are writing the same v2 format that older pageservers
// will understand
let reserialized = serde_json::to_string(&part).unwrap();
// We do not expect exact symmetry, but the reserialized version should include the legacy fields that
// v2 requires, and not be limited to just the fields that are in the runtime IndexPart
assert!(reserialized.contains("layer_metadata"));
assert!(reserialized.contains("disk_consistent_lsn"));
// The missing_layers attribute is not required
assert!(!reserialized.contains("missing_layers"));
}
#[test]
fn v3_indexpart_is_parsed() {
let metadata_bytes: Vec<u8> = [
136, 151, 49, 208, 0, 70, 0, 4, 0, 0, 0, 0, 2, 83, 38, 72, 1, 0, 0, 0, 0, 2, 83, 38,
32, 1, 87, 198, 240, 135, 97, 119, 45, 125, 38, 29, 155, 161, 140, 141, 255, 210, 0, 0,
0, 0, 2, 83, 38, 72, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0,
15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
]
.to_vec();
let metadata_bytes_str = serde_json::to_string(&metadata_bytes).unwrap();
let example = format!(
r#"{{
"version":3,
"layer_metadata":{{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": {{ "file_size": 25600000 }},
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": {{ "file_size": 9007199254741001 }}
}},
"disk_consistent_lsn":"0/16960E8",
"metadata_bytes":{metadata_bytes_str},
"deleted_at": "2023-07-31T09:00:00.123"
}}"#
);
let expected = IndexPart {
version: 3,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
})
]),
metadata: TimelineMetadata::from_bytes(metadata_bytes.as_slice()).unwrap(),
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};
let part = serde_json::from_str::<IndexPart>(&example).unwrap();
let part = serde_json::from_str::<IndexPart>(example).unwrap();
assert_eq!(part, expected);
}
#[test]
fn empty_layers_are_parsed() {
let metadata_bytes: Vec<u8> = [
136, 151, 49, 208, 0, 70, 0, 4, 0, 0, 0, 0, 2, 83, 38, 72, 1, 0, 0, 0, 0, 2, 83, 38,
32, 1, 87, 198, 240, 135, 97, 119, 45, 125, 38, 29, 155, 161, 140, 141, 255, 210, 0, 0,
0, 0, 2, 83, 38, 72, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0,
15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
]
.to_vec();
let metadata_bytes_str = serde_json::to_string(&metadata_bytes).unwrap();
let empty_layers_json = format!(
r#"{{
let empty_layers_json = r#"{
"version":1,
"timeline_layers":[],
"layer_metadata":{{}},
"layer_metadata":{},
"disk_consistent_lsn":"0/2532648",
"metadata_bytes":{metadata_bytes_str}
}}"#
);
"metadata_bytes":[136,151,49,208,0,70,0,4,0,0,0,0,2,83,38,72,1,0,0,0,0,2,83,38,32,1,87,198,240,135,97,119,45,125,38,29,155,161,140,141,255,210,0,0,0,0,2,83,38,72,0,0,0,0,1,73,240,192,0,0,0,0,1,73,240,192,0,0,0,15,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
}"#;
let expected = IndexPart {
version: 1,
timeline_layers: HashSet::new(),
layer_metadata: HashMap::new(),
metadata: TimelineMetadata::from_bytes(&metadata_bytes).unwrap(),
disk_consistent_lsn: "0/2532648".parse::<Lsn>().unwrap(),
metadata_bytes: [
136, 151, 49, 208, 0, 70, 0, 4, 0, 0, 0, 0, 2, 83, 38, 72, 1, 0, 0, 0, 0, 2, 83,
38, 32, 1, 87, 198, 240, 135, 97, 119, 45, 125, 38, 29, 155, 161, 140, 141, 255,
210, 0, 0, 0, 0, 2, 83, 38, 72, 0, 0, 0, 0, 1, 73, 240, 192, 0, 0, 0, 0, 1, 73,
240, 192, 0, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0,
]
.to_vec(),
deleted_at: None,
};
let empty_layers_parsed =
serde_json::from_str::<IndexPart>(empty_layers_json.as_str()).unwrap();
let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();
assert_eq!(empty_layers_parsed, expected);
}

View File

@@ -1722,7 +1722,7 @@ impl Timeline {
let mut corrupted_local_layers = Vec::new();
let mut added_remote_layers = Vec::new();
for remote_layer_name in index_part.layer_metadata.keys() {
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
let remote_layer_metadata = index_part
@@ -1877,7 +1877,7 @@ impl Timeline {
Some(index_part) => {
info!(
"initializing upload queue from remote index with {} layer files",
index_part.layer_metadata.len()
index_part.timeline_layers.len()
);
remote_client.init_upload_queue(index_part)?;
self.create_remote_layers(index_part, local_layers, disk_consistent_lsn)
@@ -3534,7 +3534,7 @@ impl Timeline {
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn));
all_value_refs.sort_by_key(|(key, _lsn, _value_ref)| *key);
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
@@ -3550,7 +3550,7 @@ impl Timeline {
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn));
all_keys.sort_by_key(|(key, _lsn, _size)| *key);
for (next_key, _next_lsn, _size) in all_keys.iter() {
let next_key = *next_key;

View File

@@ -38,10 +38,7 @@ use utils::{
lsn::Lsn,
};
use super::{
walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
TaskEvent, TaskHandle,
};
use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle};
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
@@ -422,19 +419,13 @@ impl ConnectionManagerState {
match res {
Ok(()) => Ok(()),
Err(e) => {
match e {
WalReceiverError::SuccessfulCompletion(msg) => {
info!("walreceiver connection handling ended with success: {msg}");
Ok(())
}
WalReceiverError::ExpectedSafekeeperError(e) => {
info!("walreceiver connection handling ended: {e}");
Ok(())
}
WalReceiverError::Other(e) => {
// give out an error to have task_mgr give it a really verbose logging
Err(e).context("walreceiver connection handling failure")
}
use super::walreceiver_connection::ExpectedError;
if e.is_expected() {
info!("walreceiver connection handling ended: {e:#}");
Ok(())
} else {
// give out an error to have task_mgr give it a really verbose logging
Err(e).context("walreceiver connection handling failure")
}
}
}

View File

@@ -8,14 +8,14 @@ use std::{
time::{Duration, SystemTime},
};
use anyhow::{anyhow, Context};
use anyhow::{bail, ensure, Context};
use bytes::BytesMut;
use chrono::{NaiveDateTime, Utc};
use fail::fail_point;
use futures::StreamExt;
use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError};
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{select, sync::watch, time};
@@ -60,50 +60,6 @@ pub(super) struct WalConnectionStatus {
pub node: NodeId,
}
pub(super) enum WalReceiverError {
/// An error of a type that does not indicate an issue, e.g. a connection closing
ExpectedSafekeeperError(postgres::Error),
/// An "error" message that carries a SUCCESSFUL_COMPLETION status code. Carries
/// the message part of the original postgres error
SuccessfulCompletion(String),
/// Generic error
Other(anyhow::Error),
}
impl From<tokio_postgres::Error> for WalReceiverError {
fn from(err: tokio_postgres::Error) -> Self {
if let Some(dberror) = err.as_db_error().filter(|db_error| {
db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
&& db_error.message().contains("ending streaming")
}) {
// Strip the outer DbError, which carries a misleading "error" severity
Self::SuccessfulCompletion(dberror.message().to_string())
} else if err.is_closed()
|| err
.source()
.and_then(|source| source.downcast_ref::<std::io::Error>())
.map(is_expected_io_error)
.unwrap_or(false)
{
Self::ExpectedSafekeeperError(err)
} else {
Self::Other(anyhow::Error::new(err))
}
}
}
impl From<anyhow::Error> for WalReceiverError {
fn from(err: anyhow::Error) -> Self {
Self::Other(err)
}
}
impl From<WalDecodeError> for WalReceiverError {
fn from(err: WalDecodeError) -> Self {
Self::Other(anyhow::Error::new(err))
}
}
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
pub(super) async fn handle_walreceiver_connection(
@@ -114,7 +70,7 @@ pub(super) async fn handle_walreceiver_connection(
connect_timeout: Duration,
ctx: RequestContext,
node: NodeId,
) -> Result<(), WalReceiverError> {
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
WALRECEIVER_STARTED_CONNECTIONS.inc();
@@ -174,15 +130,11 @@ pub(super) async fn handle_walreceiver_connection(
connection_result = connection => match connection_result {
Ok(()) => debug!("Walreceiver db connection closed"),
Err(connection_error) => {
match WalReceiverError::from(connection_error) {
WalReceiverError::ExpectedSafekeeperError(_) => {
// silence, because most likely we've already exited the outer call
// with a similar error.
},
WalReceiverError::SuccessfulCompletion(_) => {}
WalReceiverError::Other(err) => {
warn!("Connection aborted: {err:#}")
}
if connection_error.is_expected() {
// silence, because most likely we've already exited the outer call
// with a similar error.
} else {
warn!("Connection aborted: {connection_error:#}")
}
}
},
@@ -228,7 +180,7 @@ pub(super) async fn handle_walreceiver_connection(
let mut startpoint = last_rec_lsn;
if startpoint == Lsn(0) {
return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
bail!("No previous WAL position");
}
// There might be some padding after the last full record, skip it.
@@ -310,9 +262,7 @@ pub(super) async fn handle_walreceiver_connection(
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
if !lsn.is_aligned() {
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}
ensure!(lsn.is_aligned());
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
@@ -469,3 +419,51 @@ async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem>
Err(IdentifyError.into())
}
}
/// Trait for avoid reporting walreceiver specific expected or "normal" or "ok" errors.
pub(super) trait ExpectedError {
/// Test if this error is an ok error.
///
/// We don't want to report connectivity problems as real errors towards connection manager because
/// 1. they happen frequently enough to make server logs hard to read and
/// 2. the connection manager can retry other safekeeper.
///
/// If this function returns `true`, it's such an error.
/// The caller should log it at info level and then report to connection manager that we're done handling this connection.
/// Connection manager will then handle reconnections.
///
/// If this function returns an `false` the error should be propagated and the connection manager
/// will log the error at ERROR level.
fn is_expected(&self) -> bool;
}
impl ExpectedError for postgres::Error {
fn is_expected(&self) -> bool {
self.is_closed()
|| self
.source()
.and_then(|source| source.downcast_ref::<std::io::Error>())
.map(is_expected_io_error)
.unwrap_or(false)
|| self
.as_db_error()
.filter(|db_error| {
db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
&& db_error.message().contains("ending streaming")
})
.is_some()
}
}
impl ExpectedError for anyhow::Error {
fn is_expected(&self) -> bool {
let head = self.downcast_ref::<postgres::Error>();
let tail = self
.chain()
.filter_map(|e| e.downcast_ref::<postgres::Error>());
// check if self or any of the chained/sourced errors are expected
head.into_iter().chain(tail).any(|e| e.is_expected())
}
}

View File

@@ -140,24 +140,36 @@ impl UploadQueue {
}
}
let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
for (layer_name, layer_metadata) in &index_part.layer_metadata {
files.insert(
layer_name.to_owned(),
LayerFileMetadata::from(layer_metadata),
);
let mut files = HashMap::with_capacity(index_part.timeline_layers.len());
for layer_name in &index_part.timeline_layers {
match index_part
.layer_metadata
.get(layer_name)
.map(LayerFileMetadata::from)
{
Some(layer_metadata) => {
files.insert(layer_name.to_owned(), layer_metadata);
}
None => {
anyhow::bail!(
"No remote layer metadata found for layer {}",
layer_name.file_name()
);
}
}
}
let index_part_metadata = index_part.parse_metadata()?;
info!(
"initializing upload queue with remote index_part.disk_consistent_lsn: {}",
index_part.metadata.disk_consistent_lsn()
index_part_metadata.disk_consistent_lsn()
);
let state = UploadQueueInitialized {
latest_files: files,
latest_files_changes_since_metadata_upload_scheduled: 0,
latest_metadata: index_part.metadata.clone(),
last_uploaded_consistent_lsn: index_part.metadata.disk_consistent_lsn(),
latest_metadata: index_part_metadata.clone(),
last_uploaded_consistent_lsn: index_part_metadata.disk_consistent_lsn(),
// what follows are boring default initializations
task_counter: 0,
num_inprogress_layer_uploads: 0,

View File

@@ -172,7 +172,7 @@ lfc_change_limit_hook(int newval, void *extra)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
elog(LOG, "Failed to open file cache %s: %m", lfc_path);
lfc_size_limit = 0; /* disable file cache */
return;
}
@@ -557,7 +557,7 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
elog(DEBUG2, "Swap file cache page");
elog(LOG, "Swap file cache page");
}
else
{
@@ -574,7 +574,7 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
elog(LOG, "Failed to open file cache %s: %m", lfc_path);
lfc_size_limit = 0; /* disable file cache */
}
}
@@ -583,7 +583,7 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
if (rc != BLCKSZ)
{
elog(WARNING, "Failed to write file cache: %m, disabling file cache");
elog(INFO, "Failed to write file cache: %m");
lfc_size_limit = 0; /* disable file cache */
}
}

View File

@@ -79,10 +79,6 @@ struct Args {
/// Listen http endpoint for management and metrics in the form host:port.
#[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
listen_http: String,
/// Advertised endpoint for receiving/sending WAL in the form host:port. If not
/// specified, listen_pg is used to advertise instead.
#[arg(long, default_value = None)]
advertise_pg: Option<String>,
/// Availability zone of the safekeeper.
#[arg(long)]
availability_zone: Option<String>,
@@ -189,7 +185,6 @@ async fn main() -> anyhow::Result<()> {
listen_pg_addr: args.listen_pg,
listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
listen_http_addr: args.listen_http,
advertise_pg_addr: args.advertise_pg,
availability_zone: args.availability_zone,
no_sync: args.no_sync,
broker_endpoint: args.broker_endpoint,

View File

@@ -55,7 +55,6 @@ pub struct SafeKeeperConf {
pub listen_pg_addr: String,
pub listen_pg_addr_tenant_only: Option<String>,
pub listen_http_addr: String,
pub advertise_pg_addr: Option<String>,
pub availability_zone: Option<String>,
pub no_sync: bool,
pub broker_endpoint: Uri,
@@ -89,7 +88,6 @@ impl SafeKeeperConf {
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_pg_addr_tenant_only: None,
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
advertise_pg_addr: None,
availability_zone: None,
remote_storage: None,
my_id: NodeId(0),

View File

@@ -568,9 +568,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,

View File

@@ -237,10 +237,7 @@ impl SharedState {
commit_lsn: self.sk.inmem.commit_lsn.0,
remote_consistent_lsn: remote_consistent_lsn.0,
peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
safekeeper_connstr: conf
.advertise_pg_addr
.to_owned()
.unwrap_or(conf.listen_pg_addr.clone()),
safekeeper_connstr: conf.listen_pg_addr.clone(),
backup_lsn: self.sk.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),

View File

@@ -86,6 +86,19 @@ DEFAULT_OUTPUT_DIR: str = "test_output"
DEFAULT_BRANCH_NAME: str = "main"
BASE_PORT: int = 15000
WORKER_PORT_NUM: int = 1000
def pytest_configure(config: Config):
"""
Check that we do not overflow available ports range.
"""
numprocesses = config.getoption("numprocesses")
if (
numprocesses is not None and BASE_PORT + numprocesses * WORKER_PORT_NUM > 32768
): # do not use ephemeral ports
raise Exception("Too many workers configured. Cannot distribute ports for services.")
@pytest.fixture(scope="session")
@@ -187,11 +200,6 @@ def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "fu
return scope
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
@@ -204,10 +212,10 @@ def worker_seq_no(worker_id: str) -> int:
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
# so we divide ports in ranges of ports
def worker_base_port(worker_seq_no: int) -> int:
# so we divide ports in ranges of 100 ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * worker_port_num
return BASE_PORT + worker_seq_no * WORKER_PORT_NUM
def get_dir_size(path: str) -> int:
@@ -221,8 +229,8 @@ def get_dir_size(path: str) -> int:
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
def port_distributor(worker_base_port: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM)
@pytest.fixture(scope="session")
@@ -1486,6 +1494,7 @@ class NeonPageserver(PgProtocol):
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
".*Connection aborted: unexpected message from server*",
".*kill_and_wait_impl.*: wait successful.*",
".*: db error:.*ending streaming to Some.*",
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
# safekeeper connection can fail with this, in the window between timeline creation

View File

@@ -33,4 +33,4 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
time.sleep(10) # let compaction to be performed
assert env.pageserver.log_contains("compact-level0-phase1-return-same")
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T200", "-Mprepared", connstr])
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])

View File

@@ -12,7 +12,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
env.neon_cli.create_branch("test_pageserver_restart")
endpoint = env.endpoints.create_start("test_pageserver_restart")
pageserver_http = env.pageserver.http_client()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
@@ -53,11 +52,8 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
# pageserver does if a compute node connects and sends a request for the tenant
# while it's still in Loading state. (It waits for the loading to finish, and then
# processes the request.)
tenant_load_delay_ms = 5000
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={"FAILPOINTS": f"before-loading-tenant=return({tenant_load_delay_ms})"}
)
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-loading-tenant=return(5000)"})
# Check that it's in Loading state
client = env.pageserver.http_client()
@@ -69,41 +65,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
# Validate startup time metrics
metrics = pageserver_http.get_metrics()
# Expectation callbacks: arg t is sample value, arg p is the previous phase's sample value
expectations = {
"initial": lambda t, p: True, # make no assumptions about the initial time point, it could be 0 in theory
# Initial tenant load should reflect the delay we injected
"initial_tenant_load": lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p,
# Subsequent steps should occur in expected order
"initial_logical_sizes": lambda t, p: t > 0 and t >= p,
"background_jobs_can_start": lambda t, p: t > 0 and t >= p,
"complete": lambda t, p: t > 0 and t >= p,
}
prev_value = None
for sample in metrics.query_all("pageserver_startup_duration_seconds"):
labels = dict(sample.labels)
phase = labels["phase"]
log.info(f"metric {phase}={sample.value}")
assert phase in expectations, f"Unexpected phase {phase}"
assert expectations[phase](
sample.value, prev_value
), f"Unexpected value for {phase}: {sample.value}"
prev_value = sample.value
# Startup is complete, this metric should exist but be zero
assert metrics.query_one("pageserver_startup_is_loading").value == 0
# This histogram should have been populated, although we aren't specific about exactly
# which bucket values: just nonzero
assert any(
bucket.value > 0
for bucket in metrics.query_all("pageserver_tenant_activation_seconds_bucket")
)
# Test that repeatedly kills and restarts the page server, while the
# safekeeper and compute node keep running.

View File

@@ -245,7 +245,7 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
# are not removed before broadcasted to all safekeepers, with the help of replication slot
asyncio.run(
run_restarts_under_load(env, endpoint, env.safekeepers, period_time=15, iterations=4)
run_restarts_under_load(env, endpoint, env.safekeepers, period_time=15, iterations=5)
)