mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
remove materialized page cache (#8105)
part of Epic https://github.com/neondatabase/neon/issues/7386 # Motivation The materialized page cache adds complexity to the code base, which increases the maintenance burden and risk for subtle and hard to reproduce bugs such as #8050. Further, the best hit rate that we currently achieve in production is ca 1% of materialized page cache lookups for `task_kind=PageRequestHandler`. Other task kinds have hit rates <0.2%. Last, caching page images in Pageserver rewards under-sized caches in Computes because reading from Pageserver's materialized page cache over the network is often sufficiently fast (low hundreds of microseconds). Such Computes should upscale their local caches to fit their working set, rather than repeatedly requesting the same page from Pageserver. Some more discussion and context in internal thread https://neondb.slack.com/archives/C033RQ5SPDH/p1718714037708459 # Changes This PR removes the materialized page cache code & metrics. The infrastructure for different key kinds in `PageCache` is left in place, even though the "Immutable" key kind is the only remaining one. This can be further simplified in a future commit. Some tests started failing because their total runtime was dependent on high materialized page cache hit rates. This test makes them fixed-runtime or raises pytest timeouts: * test_local_file_cache_unlink * test_physical_replication * test_pg_regress # Performance I focussed on ensuring that this PR will not result in a performance regression in prod. * **getpage** requests: our production metrics have shown the materialized page cache to be irrelevant (low hit rate). Also, Pageserver is the wrong place to cache page images, it should happen in compute. * **ingest** (`task_kind=WalReceiverConnectionHandler`): prod metrics show 0 percent hit rate, so, removing will not be a regression. * **get_lsn_by_timestamp**: important API for branch creation, used by control pane. The clog pages that this code uses are not materialize-page-cached because they're not 8k. No risk of introducing a regression here. We will watch the various nightly benchmarks closely for more results before shipping to prod.
This commit is contained in:
committed by
GitHub
parent
c789ec21f6
commit
79401638df
@@ -5,4 +5,3 @@ TODO:
|
|||||||
- shared across tenants
|
- shared across tenants
|
||||||
- store pages from layer files
|
- store pages from layer files
|
||||||
- store pages from "in-memory layer"
|
- store pages from "in-memory layer"
|
||||||
- store materialized pages
|
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ depends on that, so if you change it, bad things will happen.
|
|||||||
|
|
||||||
#### page_cache_size
|
#### page_cache_size
|
||||||
|
|
||||||
Size of the page cache, to hold materialized page versions. Unit is
|
Size of the page cache. Unit is
|
||||||
number of 8 kB blocks. The default is 8192, which means 64 MB.
|
number of 8 kB blocks. The default is 8192, which means 64 MB.
|
||||||
|
|
||||||
#### max_file_descriptors
|
#### max_file_descriptors
|
||||||
|
|||||||
@@ -145,14 +145,6 @@ impl ReconstructTimeMetrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
|
|
||||||
register_int_counter!(
|
|
||||||
"pageserver_materialized_cache_hits_direct_total",
|
|
||||||
"Number of cache hits from materialized page cache without redo",
|
|
||||||
)
|
|
||||||
.expect("failed to define a metric")
|
|
||||||
});
|
|
||||||
|
|
||||||
pub(crate) struct ReconstructDataTimeMetrics {
|
pub(crate) struct ReconstructDataTimeMetrics {
|
||||||
singular: Histogram,
|
singular: Histogram,
|
||||||
vectored: Histogram,
|
vectored: Histogram,
|
||||||
@@ -182,14 +174,6 @@ pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> =
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
|
||||||
register_int_counter!(
|
|
||||||
"pageserver_materialized_cache_hits_total",
|
|
||||||
"Number of cache hits from materialized page cache",
|
|
||||||
)
|
|
||||||
.expect("failed to define a metric")
|
|
||||||
});
|
|
||||||
|
|
||||||
pub(crate) struct GetVectoredLatency {
|
pub(crate) struct GetVectoredLatency {
|
||||||
map: EnumMap<TaskKind, Option<Histogram>>,
|
map: EnumMap<TaskKind, Option<Histogram>>,
|
||||||
}
|
}
|
||||||
@@ -298,12 +282,8 @@ pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
|
|||||||
});
|
});
|
||||||
|
|
||||||
pub(crate) struct PageCacheMetricsForTaskKind {
|
pub(crate) struct PageCacheMetricsForTaskKind {
|
||||||
pub read_accesses_materialized_page: IntCounter,
|
|
||||||
pub read_accesses_immutable: IntCounter,
|
pub read_accesses_immutable: IntCounter,
|
||||||
|
|
||||||
pub read_hits_immutable: IntCounter,
|
pub read_hits_immutable: IntCounter,
|
||||||
pub read_hits_materialized_page_exact: IntCounter,
|
|
||||||
pub read_hits_materialized_page_older_lsn: IntCounter,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct PageCacheMetrics {
|
pub(crate) struct PageCacheMetrics {
|
||||||
@@ -336,16 +316,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
|
|||||||
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
|
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
|
||||||
let content_kind: &'static str = content_kind.into();
|
let content_kind: &'static str = content_kind.into();
|
||||||
PageCacheMetricsForTaskKind {
|
PageCacheMetricsForTaskKind {
|
||||||
read_accesses_materialized_page: {
|
|
||||||
PAGE_CACHE_READ_ACCESSES
|
|
||||||
.get_metric_with_label_values(&[
|
|
||||||
task_kind,
|
|
||||||
"materialized_page",
|
|
||||||
content_kind,
|
|
||||||
])
|
|
||||||
.unwrap()
|
|
||||||
},
|
|
||||||
|
|
||||||
read_accesses_immutable: {
|
read_accesses_immutable: {
|
||||||
PAGE_CACHE_READ_ACCESSES
|
PAGE_CACHE_READ_ACCESSES
|
||||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
|
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
|
||||||
@@ -357,28 +327,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
|
|||||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
|
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
|
||||||
.unwrap()
|
.unwrap()
|
||||||
},
|
},
|
||||||
|
|
||||||
read_hits_materialized_page_exact: {
|
|
||||||
PAGE_CACHE_READ_HITS
|
|
||||||
.get_metric_with_label_values(&[
|
|
||||||
task_kind,
|
|
||||||
"materialized_page",
|
|
||||||
content_kind,
|
|
||||||
"exact",
|
|
||||||
])
|
|
||||||
.unwrap()
|
|
||||||
},
|
|
||||||
|
|
||||||
read_hits_materialized_page_older_lsn: {
|
|
||||||
PAGE_CACHE_READ_HITS
|
|
||||||
.get_metric_with_label_values(&[
|
|
||||||
task_kind,
|
|
||||||
"materialized_page",
|
|
||||||
content_kind,
|
|
||||||
"older_lsn",
|
|
||||||
])
|
|
||||||
.unwrap()
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
})),
|
})),
|
||||||
@@ -394,7 +342,6 @@ pub(crate) struct PageCacheSizeMetrics {
|
|||||||
pub max_bytes: UIntGauge,
|
pub max_bytes: UIntGauge,
|
||||||
|
|
||||||
pub current_bytes_immutable: UIntGauge,
|
pub current_bytes_immutable: UIntGauge,
|
||||||
pub current_bytes_materialized_page: UIntGauge,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||||
@@ -420,11 +367,6 @@ pub(crate) static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> =
|
|||||||
.get_metric_with_label_values(&["immutable"])
|
.get_metric_with_label_values(&["immutable"])
|
||||||
.unwrap()
|
.unwrap()
|
||||||
},
|
},
|
||||||
current_bytes_materialized_page: {
|
|
||||||
PAGE_CACHE_SIZE_CURRENT_BYTES
|
|
||||||
.get_metric_with_label_values(&["materialized_page"])
|
|
||||||
.unwrap()
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
pub(crate) mod page_cache_eviction_metrics {
|
pub(crate) mod page_cache_eviction_metrics {
|
||||||
@@ -2918,13 +2860,11 @@ pub fn preinitialize_metrics() {
|
|||||||
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
|
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
|
||||||
// order:
|
// order:
|
||||||
// - global metrics reside in a Lazy<PageserverMetrics>
|
// - global metrics reside in a Lazy<PageserverMetrics>
|
||||||
// - access via crate::metrics::PS_METRICS.materialized_page_cache_hit.inc()
|
// - access via crate::metrics::PS_METRICS.some_metric.inc()
|
||||||
// - could move the statics into TimelineMetrics::new()?
|
// - could move the statics into TimelineMetrics::new()?
|
||||||
|
|
||||||
// counters
|
// counters
|
||||||
[
|
[
|
||||||
&MATERIALIZED_PAGE_CACHE_HIT,
|
|
||||||
&MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
|
||||||
&UNEXPECTED_ONDEMAND_DOWNLOADS,
|
&UNEXPECTED_ONDEMAND_DOWNLOADS,
|
||||||
&WALRECEIVER_STARTED_CONNECTIONS,
|
&WALRECEIVER_STARTED_CONNECTIONS,
|
||||||
&WALRECEIVER_BROKER_UPDATES,
|
&WALRECEIVER_BROKER_UPDATES,
|
||||||
|
|||||||
@@ -17,7 +17,6 @@
|
|||||||
//!
|
//!
|
||||||
//! Two types of pages are supported:
|
//! Two types of pages are supported:
|
||||||
//!
|
//!
|
||||||
//! * **Materialized pages**, filled & used by page reconstruction
|
|
||||||
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
|
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
|
||||||
//!
|
//!
|
||||||
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
|
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
|
||||||
@@ -28,9 +27,6 @@
|
|||||||
//! Page cache maps from a cache key to a buffer slot.
|
//! Page cache maps from a cache key to a buffer slot.
|
||||||
//! The cache key uniquely identifies the piece of data that is being cached.
|
//! The cache key uniquely identifies the piece of data that is being cached.
|
||||||
//!
|
//!
|
||||||
//! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
|
|
||||||
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
|
|
||||||
//!
|
|
||||||
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
|
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
|
||||||
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
|
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
|
||||||
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
|
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
|
||||||
@@ -82,13 +78,10 @@ use std::{
|
|||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use pageserver_api::shard::TenantShardId;
|
|
||||||
use utils::{id::TimelineId, lsn::Lsn};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
context::RequestContext,
|
context::RequestContext,
|
||||||
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
|
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
|
||||||
repository::Key,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||||
@@ -139,33 +132,7 @@ pub fn next_file_id() -> FileId {
|
|||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||||
#[allow(clippy::enum_variant_names)]
|
#[allow(clippy::enum_variant_names)]
|
||||||
enum CacheKey {
|
enum CacheKey {
|
||||||
MaterializedPage {
|
ImmutableFilePage { file_id: FileId, blkno: u32 },
|
||||||
hash_key: MaterializedPageHashKey,
|
|
||||||
lsn: Lsn,
|
|
||||||
},
|
|
||||||
ImmutableFilePage {
|
|
||||||
file_id: FileId,
|
|
||||||
blkno: u32,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
|
||||||
struct MaterializedPageHashKey {
|
|
||||||
/// Why is this TenantShardId rather than TenantId?
|
|
||||||
///
|
|
||||||
/// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this
|
|
||||||
/// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this
|
|
||||||
/// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
|
|
||||||
/// special-cased in some other way.
|
|
||||||
tenant_shard_id: TenantShardId,
|
|
||||||
timeline_id: TimelineId,
|
|
||||||
key: Key,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct Version {
|
|
||||||
lsn: Lsn,
|
|
||||||
slot_idx: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Slot {
|
struct Slot {
|
||||||
@@ -236,17 +203,6 @@ impl SlotInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct PageCache {
|
pub struct PageCache {
|
||||||
/// This contains the mapping from the cache key to buffer slot that currently
|
|
||||||
/// contains the page, if any.
|
|
||||||
///
|
|
||||||
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
|
|
||||||
/// this HashMap can be replaced with a more concurrent version, there are
|
|
||||||
/// plenty of such crates around.
|
|
||||||
///
|
|
||||||
/// If you add support for caching different kinds of objects, each object kind
|
|
||||||
/// can have a separate mapping map, next to this field.
|
|
||||||
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
|
||||||
|
|
||||||
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
||||||
|
|
||||||
/// The actual buffers with their metadata.
|
/// The actual buffers with their metadata.
|
||||||
@@ -371,175 +327,14 @@ pub enum ReadBufResult<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl PageCache {
|
impl PageCache {
|
||||||
//
|
|
||||||
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
|
||||||
// versions in the page cache
|
|
||||||
//
|
|
||||||
|
|
||||||
/// Look up a materialized page version.
|
|
||||||
///
|
|
||||||
/// The 'lsn' is an upper bound, this will return the latest version of
|
|
||||||
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
|
|
||||||
/// returned page.
|
|
||||||
pub async fn lookup_materialized_page(
|
|
||||||
&self,
|
|
||||||
tenant_shard_id: TenantShardId,
|
|
||||||
timeline_id: TimelineId,
|
|
||||||
key: &Key,
|
|
||||||
lsn: Lsn,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> Option<(Lsn, PageReadGuard)> {
|
|
||||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
crate::metrics::PAGE_CACHE
|
|
||||||
.for_ctx(ctx)
|
|
||||||
.read_accesses_materialized_page
|
|
||||||
.inc();
|
|
||||||
|
|
||||||
let mut cache_key = CacheKey::MaterializedPage {
|
|
||||||
hash_key: MaterializedPageHashKey {
|
|
||||||
tenant_shard_id,
|
|
||||||
timeline_id,
|
|
||||||
key: *key,
|
|
||||||
},
|
|
||||||
lsn,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(guard) = self
|
|
||||||
.try_lock_for_read(&mut cache_key, &mut Some(permit))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
if let CacheKey::MaterializedPage {
|
|
||||||
hash_key: _,
|
|
||||||
lsn: available_lsn,
|
|
||||||
} = cache_key
|
|
||||||
{
|
|
||||||
if available_lsn == lsn {
|
|
||||||
crate::metrics::PAGE_CACHE
|
|
||||||
.for_ctx(ctx)
|
|
||||||
.read_hits_materialized_page_exact
|
|
||||||
.inc();
|
|
||||||
} else {
|
|
||||||
crate::metrics::PAGE_CACHE
|
|
||||||
.for_ctx(ctx)
|
|
||||||
.read_hits_materialized_page_older_lsn
|
|
||||||
.inc();
|
|
||||||
}
|
|
||||||
Some((available_lsn, guard))
|
|
||||||
} else {
|
|
||||||
panic!("unexpected key type in slot");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
|
||||||
/// Store an image of the given page in the cache.
|
|
||||||
///
|
|
||||||
pub async fn memorize_materialized_page(
|
|
||||||
&self,
|
|
||||||
tenant_shard_id: TenantShardId,
|
|
||||||
timeline_id: TimelineId,
|
|
||||||
key: Key,
|
|
||||||
lsn: Lsn,
|
|
||||||
img: &[u8],
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let cache_key = CacheKey::MaterializedPage {
|
|
||||||
hash_key: MaterializedPageHashKey {
|
|
||||||
tenant_shard_id,
|
|
||||||
timeline_id,
|
|
||||||
key,
|
|
||||||
},
|
|
||||||
lsn,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
|
||||||
loop {
|
|
||||||
// First check if the key already exists in the cache.
|
|
||||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
|
||||||
// The page was found in the mapping. Lock the slot, and re-check
|
|
||||||
// that it's still what we expected (because we don't released the mapping
|
|
||||||
// lock already, another thread could have evicted the page)
|
|
||||||
let slot = &self.slots[slot_idx];
|
|
||||||
let inner = slot.inner.write().await;
|
|
||||||
if inner.key.as_ref() == Some(&cache_key) {
|
|
||||||
slot.inc_usage_count();
|
|
||||||
debug_assert!(
|
|
||||||
{
|
|
||||||
let guard = inner.permit.lock().unwrap();
|
|
||||||
guard.upgrade().is_none()
|
|
||||||
},
|
|
||||||
"we hold a write lock, so, no one else should have a permit"
|
|
||||||
);
|
|
||||||
debug_assert_eq!(inner.buf.len(), img.len());
|
|
||||||
// We already had it in cache. Another thread must've put it there
|
|
||||||
// concurrently. Check that it had the same contents that we
|
|
||||||
// replayed.
|
|
||||||
assert!(inner.buf == img);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug_assert!(permit.is_some());
|
|
||||||
|
|
||||||
// Not found. Find a victim buffer
|
|
||||||
let (slot_idx, mut inner) = self
|
|
||||||
.find_victim(permit.as_ref().unwrap())
|
|
||||||
.await
|
|
||||||
.context("Failed to find evict victim")?;
|
|
||||||
|
|
||||||
// Insert mapping for this. At this point, we may find that another
|
|
||||||
// thread did the same thing concurrently. In that case, we evicted
|
|
||||||
// our victim buffer unnecessarily. Put it into the free list and
|
|
||||||
// continue with the slot that the other thread chose.
|
|
||||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
|
|
||||||
// TODO: put to free list
|
|
||||||
|
|
||||||
// We now just loop back to start from beginning. This is not
|
|
||||||
// optimal, we'll perform the lookup in the mapping again, which
|
|
||||||
// is not really necessary because we already got
|
|
||||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
|
||||||
// to matter much.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make the slot ready
|
|
||||||
let slot = &self.slots[slot_idx];
|
|
||||||
inner.key = Some(cache_key.clone());
|
|
||||||
slot.set_usage_count(1);
|
|
||||||
// Create a write guard for the slot so we go through the expected motions.
|
|
||||||
debug_assert!(
|
|
||||||
{
|
|
||||||
let guard = inner.permit.lock().unwrap();
|
|
||||||
guard.upgrade().is_none()
|
|
||||||
},
|
|
||||||
"we hold a write lock, so, no one else should have a permit"
|
|
||||||
);
|
|
||||||
let mut write_guard = PageWriteGuard {
|
|
||||||
state: PageWriteGuardState::Invalid {
|
|
||||||
_permit: permit.take().unwrap(),
|
|
||||||
inner,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
write_guard.copy_from_slice(img);
|
|
||||||
let _ = write_guard.mark_valid();
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
|
||||||
|
|
||||||
pub async fn read_immutable_buf(
|
pub async fn read_immutable_buf(
|
||||||
&self,
|
&self,
|
||||||
file_id: FileId,
|
file_id: FileId,
|
||||||
blkno: u32,
|
blkno: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<ReadBufResult> {
|
) -> anyhow::Result<ReadBufResult> {
|
||||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
|
||||||
|
.await
|
||||||
self.lock_for_read(&mut cache_key, ctx).await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
@@ -573,19 +368,11 @@ impl PageCache {
|
|||||||
|
|
||||||
/// Look up a page in the cache.
|
/// Look up a page in the cache.
|
||||||
///
|
///
|
||||||
/// If the search criteria is not exact, *cache_key is updated with the key
|
|
||||||
/// for exact key of the returned page. (For materialized pages, that means
|
|
||||||
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
|
|
||||||
/// version.)
|
|
||||||
///
|
|
||||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
|
||||||
///
|
|
||||||
async fn try_lock_for_read(
|
async fn try_lock_for_read(
|
||||||
&self,
|
&self,
|
||||||
cache_key: &mut CacheKey,
|
cache_key: &CacheKey,
|
||||||
permit: &mut Option<PinnedSlotsPermit>,
|
permit: &mut Option<PinnedSlotsPermit>,
|
||||||
) -> Option<PageReadGuard> {
|
) -> Option<PageReadGuard> {
|
||||||
let cache_key_orig = cache_key.clone();
|
|
||||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||||
// The page was found in the mapping. Lock the slot, and re-check
|
// The page was found in the mapping. Lock the slot, and re-check
|
||||||
// that it's still what we expected (because we released the mapping
|
// that it's still what we expected (because we released the mapping
|
||||||
@@ -598,9 +385,6 @@ impl PageCache {
|
|||||||
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
|
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
|
||||||
slot_guard: inner,
|
slot_guard: inner,
|
||||||
});
|
});
|
||||||
} else {
|
|
||||||
// search_mapping might have modified the search key; restore it.
|
|
||||||
*cache_key = cache_key_orig;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
@@ -637,15 +421,12 @@ impl PageCache {
|
|||||||
///
|
///
|
||||||
async fn lock_for_read(
|
async fn lock_for_read(
|
||||||
&self,
|
&self,
|
||||||
cache_key: &mut CacheKey,
|
cache_key: &CacheKey,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<ReadBufResult> {
|
) -> anyhow::Result<ReadBufResult> {
|
||||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||||
|
|
||||||
let (read_access, hit) = match cache_key {
|
let (read_access, hit) = match cache_key {
|
||||||
CacheKey::MaterializedPage { .. } => {
|
|
||||||
unreachable!("Materialized pages use lookup_materialized_page")
|
|
||||||
}
|
|
||||||
CacheKey::ImmutableFilePage { .. } => (
|
CacheKey::ImmutableFilePage { .. } => (
|
||||||
&crate::metrics::PAGE_CACHE
|
&crate::metrics::PAGE_CACHE
|
||||||
.for_ctx(ctx)
|
.for_ctx(ctx)
|
||||||
@@ -717,52 +498,15 @@ impl PageCache {
|
|||||||
|
|
||||||
/// Search for a page in the cache using the given search key.
|
/// Search for a page in the cache using the given search key.
|
||||||
///
|
///
|
||||||
/// Returns the slot index, if any. If the search criteria is not exact,
|
/// Returns the slot index, if any.
|
||||||
/// *cache_key is updated with the actual key of the found page.
|
|
||||||
///
|
///
|
||||||
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
||||||
/// get recycled for an unrelated page immediately after this function
|
/// get recycled for an unrelated page immediately after this function
|
||||||
/// returns. The caller is responsible for re-checking that the slot still
|
/// returns. The caller is responsible for re-checking that the slot still
|
||||||
/// contains the page with the same key before using it.
|
/// contains the page with the same key before using it.
|
||||||
///
|
///
|
||||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
|
||||||
match cache_key {
|
match cache_key {
|
||||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
|
||||||
let map = self.materialized_page_map.read().unwrap();
|
|
||||||
let versions = map.get(hash_key)?;
|
|
||||||
|
|
||||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
|
||||||
Ok(version_idx) => version_idx,
|
|
||||||
Err(0) => return None,
|
|
||||||
Err(version_idx) => version_idx - 1,
|
|
||||||
};
|
|
||||||
let version = &versions[version_idx];
|
|
||||||
*lsn = version.lsn;
|
|
||||||
Some(version.slot_idx)
|
|
||||||
}
|
|
||||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
|
||||||
let map = self.immutable_page_map.read().unwrap();
|
|
||||||
Some(*map.get(&(*file_id, *blkno))?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Search for a page in the cache using the given search key.
|
|
||||||
///
|
|
||||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
|
||||||
/// allocating a new buffer.
|
|
||||||
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
|
|
||||||
match key {
|
|
||||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
|
||||||
let map = self.materialized_page_map.read().unwrap();
|
|
||||||
let versions = map.get(hash_key)?;
|
|
||||||
|
|
||||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
|
||||||
Some(versions[version_idx].slot_idx)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||||
let map = self.immutable_page_map.read().unwrap();
|
let map = self.immutable_page_map.read().unwrap();
|
||||||
Some(*map.get(&(*file_id, *blkno))?)
|
Some(*map.get(&(*file_id, *blkno))?)
|
||||||
@@ -775,27 +519,6 @@ impl PageCache {
|
|||||||
///
|
///
|
||||||
fn remove_mapping(&self, old_key: &CacheKey) {
|
fn remove_mapping(&self, old_key: &CacheKey) {
|
||||||
match old_key {
|
match old_key {
|
||||||
CacheKey::MaterializedPage {
|
|
||||||
hash_key: old_hash_key,
|
|
||||||
lsn: old_lsn,
|
|
||||||
} => {
|
|
||||||
let mut map = self.materialized_page_map.write().unwrap();
|
|
||||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
|
||||||
let versions = old_entry.get_mut();
|
|
||||||
|
|
||||||
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
|
|
||||||
versions.remove(version_idx);
|
|
||||||
self.size_metrics
|
|
||||||
.current_bytes_materialized_page
|
|
||||||
.sub_page_sz(1);
|
|
||||||
if versions.is_empty() {
|
|
||||||
old_entry.remove_entry();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
panic!("could not find old key in mapping")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||||
let mut map = self.immutable_page_map.write().unwrap();
|
let mut map = self.immutable_page_map.write().unwrap();
|
||||||
map.remove(&(*file_id, *blkno))
|
map.remove(&(*file_id, *blkno))
|
||||||
@@ -812,30 +535,6 @@ impl PageCache {
|
|||||||
/// of the existing mapping and leaves it untouched.
|
/// of the existing mapping and leaves it untouched.
|
||||||
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
||||||
match new_key {
|
match new_key {
|
||||||
CacheKey::MaterializedPage {
|
|
||||||
hash_key: new_key,
|
|
||||||
lsn: new_lsn,
|
|
||||||
} => {
|
|
||||||
let mut map = self.materialized_page_map.write().unwrap();
|
|
||||||
let versions = map.entry(new_key.clone()).or_default();
|
|
||||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
|
||||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
|
||||||
Err(version_idx) => {
|
|
||||||
versions.insert(
|
|
||||||
version_idx,
|
|
||||||
Version {
|
|
||||||
lsn: *new_lsn,
|
|
||||||
slot_idx,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
self.size_metrics
|
|
||||||
.current_bytes_materialized_page
|
|
||||||
.add_page_sz(1);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||||
let mut map = self.immutable_page_map.write().unwrap();
|
let mut map = self.immutable_page_map.write().unwrap();
|
||||||
match map.entry((*file_id, *blkno)) {
|
match map.entry((*file_id, *blkno)) {
|
||||||
@@ -949,7 +648,6 @@ impl PageCache {
|
|||||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
|
||||||
|
|
||||||
let slots = page_buffer
|
let slots = page_buffer
|
||||||
.chunks_exact_mut(PAGE_SZ)
|
.chunks_exact_mut(PAGE_SZ)
|
||||||
@@ -968,7 +666,6 @@ impl PageCache {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
materialized_page_map: Default::default(),
|
|
||||||
immutable_page_map: Default::default(),
|
immutable_page_map: Default::default(),
|
||||||
slots,
|
slots,
|
||||||
next_evict_slot: AtomicUsize::new(0),
|
next_evict_slot: AtomicUsize::new(0),
|
||||||
|
|||||||
@@ -101,9 +101,7 @@ use crate::{
|
|||||||
|
|
||||||
use crate::config::PageServerConf;
|
use crate::config::PageServerConf;
|
||||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||||
use crate::metrics::{
|
use crate::metrics::TimelineMetrics;
|
||||||
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
|
||||||
};
|
|
||||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||||
use crate::tenant::config::TenantConfOpt;
|
use crate::tenant::config::TenantConfOpt;
|
||||||
use pageserver_api::reltag::RelTag;
|
use pageserver_api::reltag::RelTag;
|
||||||
@@ -120,7 +118,6 @@ use utils::{
|
|||||||
simple_rcu::{Rcu, RcuReadGuard},
|
simple_rcu::{Rcu, RcuReadGuard},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::page_cache;
|
|
||||||
use crate::repository::GcResult;
|
use crate::repository::GcResult;
|
||||||
use crate::repository::{Key, Value};
|
use crate::repository::{Key, Value};
|
||||||
use crate::task_mgr;
|
use crate::task_mgr;
|
||||||
@@ -134,7 +131,7 @@ use self::layer_manager::LayerManager;
|
|||||||
use self::logical_size::LogicalSize;
|
use self::logical_size::LogicalSize;
|
||||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||||
|
|
||||||
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
|
use super::config::TenantConf;
|
||||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||||
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
||||||
@@ -887,32 +884,11 @@ impl Timeline {
|
|||||||
|
|
||||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||||
|
|
||||||
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
|
|
||||||
// The cached image can be returned directly if there is no WAL between the cached image
|
|
||||||
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
|
|
||||||
// for redo.
|
|
||||||
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
|
|
||||||
Some((cached_lsn, cached_img)) => {
|
|
||||||
match cached_lsn.cmp(&lsn) {
|
|
||||||
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
|
|
||||||
Ordering::Equal => {
|
|
||||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
|
|
||||||
return Ok(cached_img); // exact LSN match, return the image
|
|
||||||
}
|
|
||||||
Ordering::Greater => {
|
|
||||||
unreachable!("the returned lsn should never be after the requested lsn")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some((cached_lsn, cached_img))
|
|
||||||
}
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
match self.conf.get_impl {
|
match self.conf.get_impl {
|
||||||
GetImpl::Legacy => {
|
GetImpl::Legacy => {
|
||||||
let reconstruct_state = ValueReconstructState {
|
let reconstruct_state = ValueReconstructState {
|
||||||
records: Vec::new(),
|
records: Vec::new(),
|
||||||
img: cached_page_img,
|
img: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||||
@@ -926,13 +902,6 @@ impl Timeline {
|
|||||||
// entry returned above.
|
// entry returned above.
|
||||||
let mut reconstruct_state = ValuesReconstructState::new();
|
let mut reconstruct_state = ValuesReconstructState::new();
|
||||||
|
|
||||||
// Only add the cached image to the reconstruct state when it exists.
|
|
||||||
if cached_page_img.is_some() {
|
|
||||||
let mut key_state = VectoredValueReconstructState::default();
|
|
||||||
key_state.img = cached_page_img;
|
|
||||||
reconstruct_state.keys.insert(key, Ok(key_state));
|
|
||||||
}
|
|
||||||
|
|
||||||
let vectored_res = self
|
let vectored_res = self
|
||||||
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
||||||
.await;
|
.await;
|
||||||
@@ -3240,7 +3209,6 @@ impl Timeline {
|
|||||||
ValueReconstructResult::Continue => {
|
ValueReconstructResult::Continue => {
|
||||||
// If we reached an earlier cached page image, we're done.
|
// If we reached an earlier cached page image, we're done.
|
||||||
if cont_lsn == cached_lsn + 1 {
|
if cont_lsn == cached_lsn + 1 {
|
||||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
|
||||||
return Ok(traversal_path);
|
return Ok(traversal_path);
|
||||||
}
|
}
|
||||||
if let Some(prev) = prev_lsn {
|
if let Some(prev) = prev_lsn {
|
||||||
@@ -3614,26 +3582,6 @@ impl Timeline {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # Cancel-safety
|
|
||||||
///
|
|
||||||
/// This method is cancellation-safe.
|
|
||||||
async fn lookup_cached_page(
|
|
||||||
&self,
|
|
||||||
key: &Key,
|
|
||||||
lsn: Lsn,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> Option<(Lsn, Bytes)> {
|
|
||||||
let cache = page_cache::get();
|
|
||||||
|
|
||||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
|
||||||
// We should look at the key to determine if it's a cacheable object
|
|
||||||
let (lsn, read_guard) = cache
|
|
||||||
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
|
|
||||||
.await?;
|
|
||||||
let img = Bytes::from(read_guard.to_vec());
|
|
||||||
Some((lsn, img))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_ready_ancestor_timeline(
|
async fn get_ready_ancestor_timeline(
|
||||||
&self,
|
&self,
|
||||||
ancestor: &Arc<Timeline>,
|
ancestor: &Arc<Timeline>,
|
||||||
@@ -5280,8 +5228,6 @@ impl Timeline {
|
|||||||
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
||||||
};
|
};
|
||||||
|
|
||||||
let last_rec_lsn = data.records.last().unwrap().0;
|
|
||||||
|
|
||||||
let img = match self
|
let img = match self
|
||||||
.walredo_mgr
|
.walredo_mgr
|
||||||
.as_ref()
|
.as_ref()
|
||||||
@@ -5295,23 +5241,6 @@ impl Timeline {
|
|||||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||||
};
|
};
|
||||||
|
|
||||||
if img.len() == page_cache::PAGE_SZ {
|
|
||||||
let cache = page_cache::get();
|
|
||||||
if let Err(e) = cache
|
|
||||||
.memorize_materialized_page(
|
|
||||||
self.tenant_shard_id,
|
|
||||||
self.timeline_id,
|
|
||||||
key,
|
|
||||||
last_rec_lsn,
|
|
||||||
&img,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.context("Materialized page memoization failed")
|
|
||||||
{
|
|
||||||
return Err(PageReconstructError::from(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(img)
|
Ok(img)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -118,8 +118,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
|||||||
"libmetrics_launch_timestamp",
|
"libmetrics_launch_timestamp",
|
||||||
"libmetrics_build_info",
|
"libmetrics_build_info",
|
||||||
"libmetrics_tracing_event_count_total",
|
"libmetrics_tracing_event_count_total",
|
||||||
"pageserver_materialized_cache_hits_total",
|
|
||||||
"pageserver_materialized_cache_hits_direct_total",
|
|
||||||
"pageserver_page_cache_read_hits_total",
|
"pageserver_page_cache_read_hits_total",
|
||||||
"pageserver_page_cache_read_accesses_total",
|
"pageserver_page_cache_read_accesses_total",
|
||||||
"pageserver_page_cache_size_current_bytes",
|
"pageserver_page_cache_size_current_bytes",
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
import random
|
import random
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@@ -8,11 +9,7 @@ from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
|
|||||||
from fixtures.utils import query_scalar
|
from fixtures.utils import query_scalar
|
||||||
|
|
||||||
|
|
||||||
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
|
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
|
||||||
if build_type == "debug":
|
|
||||||
# Disable vectored read path cross validation since it makes the test time out.
|
|
||||||
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
|
|
||||||
|
|
||||||
env = neon_env_builder.init_start()
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||||
@@ -33,11 +30,10 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
|
|||||||
|
|
||||||
cur = endpoint.connect().cursor()
|
cur = endpoint.connect().cursor()
|
||||||
|
|
||||||
|
stop = threading.Event()
|
||||||
n_rows = 100000
|
n_rows = 100000
|
||||||
n_threads = 20
|
n_threads = 20
|
||||||
n_updates_per_thread = 10000
|
|
||||||
n_updates_per_connection = 1000
|
n_updates_per_connection = 1000
|
||||||
n_total_updates = n_threads * n_updates_per_thread
|
|
||||||
|
|
||||||
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
||||||
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
||||||
@@ -48,11 +44,11 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
|
|||||||
# performed (plus the initial 1 on each row).
|
# performed (plus the initial 1 on each row).
|
||||||
#
|
#
|
||||||
# Furthermore, each thread will reconnect between every 1000 updates.
|
# Furthermore, each thread will reconnect between every 1000 updates.
|
||||||
def run_updates():
|
def run_updates(n_updates_performed_q: queue.Queue[int]):
|
||||||
n_updates_performed = 0
|
n_updates_performed = 0
|
||||||
conn = endpoint.connect()
|
conn = endpoint.connect()
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
for _ in range(n_updates_per_thread):
|
while not stop.is_set():
|
||||||
id = random.randint(1, n_rows)
|
id = random.randint(1, n_rows)
|
||||||
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
||||||
n_updates_performed += 1
|
n_updates_performed += 1
|
||||||
@@ -61,19 +57,28 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: s
|
|||||||
conn.close()
|
conn.close()
|
||||||
conn = endpoint.connect()
|
conn = endpoint.connect()
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
|
n_updates_performed_q.put(n_updates_performed)
|
||||||
|
|
||||||
|
n_updates_performed_q: queue.Queue[int] = queue.Queue()
|
||||||
threads: List[threading.Thread] = []
|
threads: List[threading.Thread] = []
|
||||||
for _i in range(n_threads):
|
for _i in range(n_threads):
|
||||||
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
|
||||||
thread.start()
|
thread.start()
|
||||||
threads.append(thread)
|
threads.append(thread)
|
||||||
|
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
|
# unlink, this is what we're actually testing
|
||||||
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
||||||
os.rename(cache_dir, new_cache_dir)
|
os.rename(cache_dir, new_cache_dir)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
stop.set()
|
||||||
|
|
||||||
|
n_updates_performed = 0
|
||||||
for thread in threads:
|
for thread in threads:
|
||||||
thread.join()
|
thread.join()
|
||||||
|
n_updates_performed += n_updates_performed_q.get()
|
||||||
|
|
||||||
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed
|
||||||
|
|||||||
@@ -23,11 +23,11 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||||
#
|
#
|
||||||
|
@pytest.mark.timeout(600)
|
||||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||||
def test_pg_regress(
|
def test_pg_regress(
|
||||||
neon_env_builder: NeonEnvBuilder,
|
neon_env_builder: NeonEnvBuilder,
|
||||||
test_output_dir: Path,
|
test_output_dir: Path,
|
||||||
build_type: str,
|
|
||||||
pg_bin: PgBin,
|
pg_bin: PgBin,
|
||||||
capsys: CaptureFixture[str],
|
capsys: CaptureFixture[str],
|
||||||
base_dir: Path,
|
base_dir: Path,
|
||||||
@@ -43,10 +43,6 @@ def test_pg_regress(
|
|||||||
if shard_count is not None:
|
if shard_count is not None:
|
||||||
neon_env_builder.num_pageservers = shard_count
|
neon_env_builder.num_pageservers = shard_count
|
||||||
|
|
||||||
if build_type == "debug":
|
|
||||||
# Disable vectored read path cross validation since it makes the test time out.
|
|
||||||
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
|
|
||||||
|
|
||||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||||
neon_env_builder.enable_scrub_on_exit()
|
neon_env_builder.enable_scrub_on_exit()
|
||||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ from fixtures.neon_fixtures import NeonEnv
|
|||||||
|
|
||||||
def test_physical_replication(neon_simple_env: NeonEnv):
|
def test_physical_replication(neon_simple_env: NeonEnv):
|
||||||
env = neon_simple_env
|
env = neon_simple_env
|
||||||
n_records = 100000
|
|
||||||
with env.endpoints.create_start(
|
with env.endpoints.create_start(
|
||||||
branch_name="main",
|
branch_name="main",
|
||||||
endpoint_id="primary",
|
endpoint_id="primary",
|
||||||
@@ -22,8 +21,20 @@ def test_physical_replication(neon_simple_env: NeonEnv):
|
|||||||
with p_con.cursor() as p_cur:
|
with p_con.cursor() as p_cur:
|
||||||
with secondary.connect() as s_con:
|
with secondary.connect() as s_con:
|
||||||
with s_con.cursor() as s_cur:
|
with s_con.cursor() as s_cur:
|
||||||
for pk in range(n_records):
|
runtime_secs = 30
|
||||||
|
started_at = time.time()
|
||||||
|
pk = 0
|
||||||
|
while True:
|
||||||
|
pk += 1
|
||||||
|
now = time.time()
|
||||||
|
if now - started_at > runtime_secs:
|
||||||
|
break
|
||||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||||
|
# an earlier version of this test was based on a fixed number of loop iterations
|
||||||
|
# and selected for pk=(random.randrange(1, fixed number of loop iterations)).
|
||||||
|
# => the probability of selection for a value that was never inserted changed from 99.9999% to 0% over the course of the test.
|
||||||
|
#
|
||||||
|
# We changed the test to where=(random.randrange(1, 2*pk)), which means the probability is now fixed to 50%.
|
||||||
s_cur.execute(
|
s_cur.execute(
|
||||||
"select * from t where pk=%s", (random.randrange(1, n_records),)
|
"select * from t where pk=%s", (random.randrange(1, 2 * pk),)
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user