Compare commits

..

1 Commits

Author SHA1 Message Date
Heikki Linnakangas
8d8c72803a RFC: move decision-making of desired VM size to VM monitor 2024-06-19 16:46:28 +03:00
21 changed files with 694 additions and 235 deletions

View File

@@ -183,7 +183,8 @@ runs:
# Run the tests.
#
# --alluredir saves test results in Allure format (in a specified directory)
# The junit.xml file allows CI tools to display more fine-grained test information
# in its "Tests" tab in the results page.
# --verbose prints name of each test (helpful when there are
# multiple tests in one file)
# -rA prints summary in the end
@@ -192,6 +193,7 @@ runs:
#
mkdir -p $TEST_OUTPUT/allure/results
"${cov_prefix[@]}" ./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--alluredir=$TEST_OUTPUT/allure/results \
--tb=short \
--verbose \

View File

@@ -36,16 +36,15 @@ jobs:
fail_on_error: true
filter_mode: nofilter
level: error
- name: Disallow 'ubuntu-latest' runners
run: |
- run: |
PAT='^\s*runs-on:.*-latest'
if grep -ERq $PAT .github/workflows; then
if grep -ERq $PAT .github/workflows
then
grep -ERl $PAT .github/workflows |\
while read -r f
do
l=$(grep -nE $PAT .github/workflows/release.yml | awk -F: '{print $1}' | head -1)
echo "::error file=$f,line=$l::Please use 'ubuntu-22.04' instead of 'ubuntu-latest'"
echo "::error file=$f,line=$l::Please, do not use ubuntu-latest images to run on, use LTS instead."
done
exit 1
fi

View File

@@ -1023,18 +1023,6 @@ jobs:
with:
fetch-depth: 0
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker
- name: Set custom docker config directory
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
# `neondatabase/neon` contains multiple binaries, all of them use the same input for the version into the same version formatting library.
# Pick pageserver as currently the only binary with extra "version" features printed in the string to verify.
# Regular pageserver version string looks like
@@ -1069,11 +1057,6 @@ jobs:
docker compose -f ./docker-compose/docker-compose.yml logs || 0
docker compose -f ./docker-compose/docker-compose.yml down
- name: Remove custom docker config directory
if: always()
run: |
rm -rf .docker-custom
promote-images:
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04

View File

@@ -52,15 +52,13 @@ jobs:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
TITLE="Storage & Compute release ${RELEASE_DATE}"
cat << EOF > body.md
## ${TITLE}
## Storage & Compute release ${RELEASE_DATE}
**Please merge this Pull Request using 'Create a merge commit' button**
EOF
gh pr create --title "${TITLE}" \
gh pr create --title "Release ${RELEASE_DATE}" \
--body-file "body.md" \
--head "${RELEASE_BRANCH}" \
--base "release"
@@ -93,15 +91,13 @@ jobs:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
TITLE="Proxy release ${RELEASE_DATE}"
cat << EOF > body.md
## ${TITLE}
## Proxy release ${RELEASE_DATE}
**Please merge this Pull Request using 'Create a merge commit' button**
EOF
gh pr create --title "${TITLE}" \
gh pr create --title "Proxy release ${RELEASE_DATE}" \
--body-file "body.md" \
--head "${RELEASE_BRANCH}" \
--base "release-proxy"

View File

@@ -45,7 +45,6 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use compute_tools::lsn_lease::launch_lsn_lease_loop_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
@@ -364,8 +363,6 @@ fn wait_spec(
state.start_time = now;
}
launch_lsn_lease_loop_for_static(&compute);
Ok(WaitSpecResult {
compute,
http_port,

View File

@@ -11,7 +11,6 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
pub mod lsn_lease;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -1,113 +0,0 @@
use anyhow::bail;
use anyhow::Result;
use postgres::{NoTls, SimpleQueryMessage};
use std::{
str::FromStr,
sync::Arc,
thread,
time::{Duration, SystemTime},
};
use compute_api::spec::ComputeMode;
use tracing::{error, info};
use utils::lsn::Lsn;
use crate::compute::{ComputeNode, ComputeState};
pub fn launch_lsn_lease_loop_for_static(compute: &Arc<ComputeNode>) {
let lsn = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
match spec.spec.mode {
ComputeMode::Static(lsn) => lsn,
_ => return,
}
};
let compute = compute.clone();
thread::spawn(move || lsn_lease_loop(compute, lsn));
}
fn postgres_configs_from_state(compute_state: &ComputeState) -> Vec<postgres::Config> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let conn_strings = spec.pageserver_connstr.split(',');
conn_strings
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token.clone());
} else {
info!("Storage auth token not set");
}
config
})
.collect::<Vec<_>>()
}
fn lsn_lease_loop(compute: Arc<ComputeNode>, lsn: Lsn) {
loop {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
let configs = postgres_configs_from_state(&state);
let cmd = format!("lease lsn {} {} {} ", spec.tenant_id, spec.timeline_id, lsn);
drop(state);
match lsn_lease_request(&configs, &cmd) {
Ok(valid_until) => {
let valid_until_duration = Duration::from_millis(valid_until as u64);
let sleep_duration = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.checked_sub(valid_until_duration)
.unwrap_or(Duration::ZERO)
.checked_sub(Duration::from_secs(60))
.unwrap_or(Duration::ZERO);
// Ensure the sleep duration is at least 60 seconds to avoid busy loops
let sleep_duration = std::cmp::max(sleep_duration, Duration::from_secs(60));
info!(
"lsn_lease_request succeeded, sleeping for {} seconds",
sleep_duration.as_secs()
);
thread::sleep(sleep_duration);
}
Err(e) => {
error!("lsn_lease_request failed: {:#}", e);
thread::sleep(Duration::from_secs(10));
}
}
}
}
fn lsn_lease_request(configs: &[postgres::Config], cmd: &str) -> Result<u128> {
info!("lsn_lease_request: {}", cmd);
let valid_until = configs
.iter()
.map(|config| {
let mut client = config.connect(NoTls)?;
let res = client.simple_query(cmd)?;
let msg = match res.first() {
Some(msg) => msg,
None => bail!("empty response"),
};
let row = match msg {
SimpleQueryMessage::Row(row) => row,
_ => bail!("error parsing lsn lease response"),
};
let valid_until_str = match row.get("valid_until") {
Some(valid_until) => valid_until,
None => bail!("valid_until not found"),
};
Ok(u128::from_str(valid_until_str)?)
})
.collect::<Result<Vec<u128>>>()?
.into_iter()
.min()
.unwrap();
Ok(valid_until)
}

View File

@@ -5,3 +5,4 @@ TODO:
- shared across tenants
- store pages from layer files
- store pages from "in-memory layer"
- store materialized pages

View File

@@ -0,0 +1,181 @@
# Compute controlled autoscaling
## Summary
The proposal is to move the responsibility for deciding what the
desired size of a VM is, from the autoscaler agent to the VM monitor.
## Motivation
Currently, the decision to upscale or downscale a VM is made outside
the VM, in the autoscaler-agent, based on the load average and memory
consumption. In addition to that, the VM can override that decision by
making an explicit upscale request, which will bump up the VM size for
a certain period of time.
Moving the decision-making to the compute has several benefits:
- It allows the compute to explicitly request a certain size, which
can be useful for testing and debugging purposes. We can expose that
as SQL-callable functions to provide an "escape hatch" for the cases
where the automatic algorithm doesn't work well.
- It makes it possible for the compute to upscale _before_ making a
large memory allocation, for example, avoiding OOM. We battled with
this problem with pgvector, which makes one giant dynamic shared
memory allocation at CREATE INDEX, which would immediately OOM if
the instance wasn't already scaled up to accommodate it. We mostly
solved it by enabling swap and setting dynamic_shared_memory=mmap,
but it's fiddly. Explicitly scaling up would be more robust.
- It's simpler. Even with no change to the actual algorithm used, it
is more straightforward to measure the CPU and memory usage directly
in the VM. The vm-monitor can read the statistics directly from the
OS, while the agent needs to receive them through Vector. Also, we
already had the "emergency upscale" request codepath that the
vm-monitor could use to request immediate upscale. This proposal
eliminates that case as a separate thing, all downscale/upscale
requests follow the same path.
- We can improve the scaling algorithm to take into account more
information about query plans and resource usage, without having to
expose all the information to the agent outside the VM.
- Organizationally, the compute team has the PostgreSQL expertise to
improve the algorithm for choosing the optimal compute size in the
long run.
## Scope / Non Goals
This proposal is only about choosing the *desired* size of the
VM. There can be reasons that the agent / scheduler cannot grant the
desired resources, e.g if the host is overbooked. Downscaling to the
desired size might also not be possible, because of an inbalance in
Linux memory zones, or if there is a sudden spike in memory usage
after the desired size was last calculated. This proposal doesn't
change what happens in those cases.
What happens between the autoscaler agent, neonvm controller, and to
resize the VM is out of scope. Scheduling VMs on nodes is also out of
scope. Choosing the desired size of a VM is a purely VM-local
decision, and doesn't take into account overall system load.
## Impacted components (e.g. pageserver, safekeeper, console, etc)
VM monitor (libs/vm_monitor) and autoscaling agent.
## Proposed implementation
The current VM monitor <-> autoscaling agent protocol includes an
UpscaleRequest message. The VM monitor sends that message to request
upscaling when memory is running low. The proposal is to add a new
ScaleRequest message, which is similar to UpscaleRequest, but tells
the agent directly what the desired CPU and memory size of the VM
is. When the autoscaler agent receives that message, it tries to
upscale/downscale the VM to the size specified in the message, and
disables theusual metrics-based algorithm.
### Backwards compatibility
This proposal adds a new ScaleRequest message, but doesn't modify the
old message types. The migration path is to deploy the support for the
new message in the autoscaler agent first. After that, new computes
can start using a new version of the VM monitor, which issues
ScaleRequest messages. After all old computes have expired, the
metrics-based algorithm and the code to handle the old UpscaleRequest
message can be removed from the autocaler agent.
### Reliability, failure modes and corner cases
The VM monitor's and autoscaler agent's idea of what the desired and
current VM size is might go out of sync, e.g. if the autoscaler agent
is restarted or some messages are lost. Or if upscaling or downscaling
fails for some reason. The VM monitor will periodically resend the
ScaleRequest if the actual size of the VM doesn't match the desired
size.
### Interaction/Sequence diagram
Currently, before this proposal, there are two ways that upscaling /
downscaling can be initiated. It can be initiated by an algorithm that
runs in the autoscaler agent, which makes the decision based on metrics
received from Vector running inside the VM:
Current metrics-based scaling:
+--------+ (1) metrics +------------+
| Vector | -------------> | Autoscaler |
+--------+ | agent |
+------------+
|
|
(2) UpscaleNotification |
/ DownscaleRequest |
|
+------------+ |
| VM monitor | <------------------/
+------------+
|
|
+------+ (3) QMP plug/unplug |
| QEMU | <-------------------------/
+------+
The second way to initiate upscaling (but not downscaling!), is that
the VM monitor requests upscaling by sending an UpscaleRequest message
to the agent. The upscale request overrides the metrics-based algorithm's
decision for some period of time:
+------------+ (1) UpscaleRequest
| VM monitor | <------------------\
+------------+ |
V
+------------+
| Autoscaler |
| agent |
+------------+
|
+------+ (2) QMP hotplug |
| QEMU | <-------------------------/
+------+
The proposal is to replace the above two mechanisms with a new
mechanism that is very similar to the old UpscaleRequest message. The
difference is that the new ScaleRequest message can initiate immediate
upscaling but also downscaling, and when the new mechanism is used,
the metrics-based algorithm in the agent is disabled for the VM, and
upscaling/downscaling is only initiated when a new ScaleRequest is
received from the VM monitor.
+------------+ (1) ScaleRequest
| VM monitor | <------------------\
+------------+ |
V
+------------+
| Autoscaler |
| agent |
+------------+
|
+------+ (2) QMP hotplug |
| QEMU | <-------------------------/
+------+
### Scalability (if relevant)
n/a
### Security implications (if relevant)
This doesn't open any new communication paths.
### Unresolved questions (if relevant)
## Alternative implementation (if relevant)
## Pros/cons of proposed approaches (if relevant)
## Definition of Done (if relevant)

View File

@@ -134,7 +134,7 @@ depends on that, so if you change it, bad things will happen.
#### page_cache_size
Size of the page cache. Unit is
Size of the page cache, to hold materialized page versions. Unit is
number of 8 kB blocks. The default is 8192, which means 64 MB.
#### max_file_descriptors

View File

@@ -145,6 +145,14 @@ impl ReconstructTimeMetrics {
}
}
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_direct_total",
"Number of cache hits from materialized page cache without redo",
)
.expect("failed to define a metric")
});
pub(crate) struct ReconstructDataTimeMetrics {
singular: Histogram,
vectored: Histogram,
@@ -174,6 +182,14 @@ pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> =
}
});
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
)
.expect("failed to define a metric")
});
pub(crate) struct GetVectoredLatency {
map: EnumMap<TaskKind, Option<Histogram>>,
}
@@ -282,8 +298,12 @@ pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
});
pub(crate) struct PageCacheMetricsForTaskKind {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_immutable: IntCounter,
pub read_hits_immutable: IntCounter,
pub read_hits_materialized_page_exact: IntCounter,
pub read_hits_materialized_page_older_lsn: IntCounter,
}
pub(crate) struct PageCacheMetrics {
@@ -316,6 +336,16 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
let content_kind: &'static str = content_kind.into();
PageCacheMetricsForTaskKind {
read_accesses_materialized_page: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
@@ -327,6 +357,28 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
.unwrap()
},
read_hits_materialized_page_exact: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"exact",
])
.unwrap()
},
read_hits_materialized_page_older_lsn: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"older_lsn",
])
.unwrap()
},
}
}))
})),
@@ -342,6 +394,7 @@ pub(crate) struct PageCacheSizeMetrics {
pub max_bytes: UIntGauge,
pub current_bytes_immutable: UIntGauge,
pub current_bytes_materialized_page: UIntGauge,
}
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
@@ -367,6 +420,11 @@ pub(crate) static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> =
.get_metric_with_label_values(&["immutable"])
.unwrap()
},
current_bytes_materialized_page: {
PAGE_CACHE_SIZE_CURRENT_BYTES
.get_metric_with_label_values(&["materialized_page"])
.unwrap()
},
});
pub(crate) mod page_cache_eviction_metrics {
@@ -2860,11 +2918,13 @@ pub fn preinitialize_metrics() {
// FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of
// order:
// - global metrics reside in a Lazy<PageserverMetrics>
// - access via crate::metrics::PS_METRICS.some_metric.inc()
// - access via crate::metrics::PS_METRICS.materialized_page_cache_hit.inc()
// - could move the statics into TimelineMetrics::new()?
// counters
[
&MATERIALIZED_PAGE_CACHE_HIT,
&MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
&UNEXPECTED_ONDEMAND_DOWNLOADS,
&WALRECEIVER_STARTED_CONNECTIONS,
&WALRECEIVER_BROKER_UPDATES,

View File

@@ -17,6 +17,7 @@
//!
//! Two types of pages are supported:
//!
//! * **Materialized pages**, filled & used by page reconstruction
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
//!
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
@@ -27,6 +28,9 @@
//! Page cache maps from a cache key to a buffer slot.
//! The cache key uniquely identifies the piece of data that is being cached.
//!
//! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
//!
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
@@ -78,10 +82,13 @@ use std::{
use anyhow::Context;
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use utils::{id::TimelineId, lsn::Lsn};
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
repository::Key,
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -132,7 +139,33 @@ pub fn next_file_id() -> FileId {
#[derive(Debug, PartialEq, Eq, Clone)]
#[allow(clippy::enum_variant_names)]
enum CacheKey {
ImmutableFilePage { file_id: FileId, blkno: u32 },
MaterializedPage {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
ImmutableFilePage {
file_id: FileId,
blkno: u32,
},
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct MaterializedPageHashKey {
/// Why is this TenantShardId rather than TenantId?
///
/// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this
/// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this
/// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
/// special-cased in some other way.
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: Key,
}
#[derive(Clone)]
struct Version {
lsn: Lsn,
slot_idx: usize,
}
struct Slot {
@@ -203,6 +236,17 @@ impl SlotInner {
}
pub struct PageCache {
/// This contains the mapping from the cache key to buffer slot that currently
/// contains the page, if any.
///
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
/// this HashMap can be replaced with a more concurrent version, there are
/// plenty of such crates around.
///
/// If you add support for caching different kinds of objects, each object kind
/// can have a separate mapping map, next to this field.
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
@@ -327,14 +371,175 @@ pub enum ReadBufResult<'a> {
}
impl PageCache {
//
// Section 1.1: Public interface functions for looking up and memorizing materialized page
// versions in the page cache
//
/// Look up a materialized page version.
///
/// The 'lsn' is an upper bound, this will return the latest version of
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
/// returned page.
pub async fn lookup_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
return None;
};
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_accesses_materialized_page
.inc();
let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key: *key,
},
lsn,
};
if let Some(guard) = self
.try_lock_for_read(&mut cache_key, &mut Some(permit))
.await
{
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_older_lsn
.inc();
}
Some((available_lsn, guard))
} else {
panic!("unexpected key type in slot");
}
} else {
None
}
}
///
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: Key,
lsn: Lsn,
img: &[u8],
) -> anyhow::Result<()> {
let cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key,
},
lsn,
};
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(&cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
debug_assert_eq!(inner.buf.len(), img.len());
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(inner.buf == img);
return Ok(());
}
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
// Create a write guard for the slot so we go through the expected motions.
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
let mut write_guard = PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
};
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
return Ok(());
}
}
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
.await
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key, ctx).await
}
//
@@ -368,11 +573,19 @@ impl PageCache {
/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
/// for exact key of the returned page. (For materialized pages, that means
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
/// version.)
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(
&self,
cache_key: &CacheKey,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we released the mapping
@@ -385,6 +598,9 @@ impl PageCache {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
}
}
None
@@ -421,12 +637,15 @@ impl PageCache {
///
async fn lock_for_read(
&self,
cache_key: &CacheKey,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE
.for_ctx(ctx)
@@ -498,15 +717,52 @@ impl PageCache {
/// Search for a page in the cache using the given search key.
///
/// Returns the slot index, if any.
/// Returns the slot index, if any. If the search criteria is not exact,
/// *cache_key is updated with the actual key of the found page.
///
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
/// get recycled for an unrelated page immediately after this function
/// returns. The caller is responsible for re-checking that the slot still
/// contains the page with the same key before using it.
///
fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
match cache_key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
Ok(version_idx) => version_idx,
Err(0) => return None,
Err(version_idx) => version_idx - 1,
};
let version = &versions[version_idx];
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
/// Search for a page in the cache using the given search key.
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
Some(versions[version_idx].slot_idx)
} else {
None
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -519,6 +775,27 @@ impl PageCache {
///
fn remove_mapping(&self, old_key: &CacheKey) {
match old_key {
CacheKey::MaterializedPage {
hash_key: old_hash_key,
lsn: old_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
let versions = old_entry.get_mut();
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
versions.remove(version_idx);
self.size_metrics
.current_bytes_materialized_page
.sub_page_sz(1);
if versions.is_empty() {
old_entry.remove_entry();
}
}
} else {
panic!("could not find old key in mapping")
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
@@ -535,6 +812,30 @@ impl PageCache {
/// of the existing mapping and leaves it untouched.
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
match new_key {
CacheKey::MaterializedPage {
hash_key: new_key,
lsn: new_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
let versions = map.entry(new_key.clone()).or_default();
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
Ok(version_idx) => Some(versions[version_idx].slot_idx),
Err(version_idx) => {
versions.insert(
version_idx,
Version {
lsn: *new_lsn,
slot_idx,
},
);
self.size_metrics
.current_bytes_materialized_page
.add_page_sz(1);
None
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
@@ -648,6 +949,7 @@ impl PageCache {
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
@@ -666,6 +968,7 @@ impl PageCache {
.collect();
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),

View File

@@ -945,8 +945,9 @@ impl PageServerHandler {
b"valid_until",
)]))?
.write_message_noflush(&BeMessage::DataRow(&[Some(
valid_until.as_millis().to_string().as_bytes(),
)]))?;
&valid_until.as_millis().to_be_bytes(),
)]))?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Ok(())
}

View File

@@ -101,7 +101,9 @@ use crate::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
use pageserver_api::reltag::RelTag;
@@ -118,6 +120,7 @@ use utils::{
simple_rcu::{Rcu, RcuReadGuard},
};
use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr;
@@ -131,7 +134,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::{config::TenantConf, storage_layer::VectoredValueReconstructState};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
@@ -884,11 +887,32 @@ impl Timeline {
self.timeline_get_throttle.throttle(ctx, 1).await;
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => {
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
return Ok(cached_img); // exact LSN match, return the image
}
Ordering::Greater => {
unreachable!("the returned lsn should never be after the requested lsn")
}
}
Some((cached_lsn, cached_img))
}
None => None,
};
match self.conf.get_impl {
GetImpl::Legacy => {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: None,
img: cached_page_img,
};
self.get_impl(key, lsn, reconstruct_state, ctx).await
@@ -902,6 +926,13 @@ impl Timeline {
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
// Only add the cached image to the reconstruct state when it exists.
if cached_page_img.is_some() {
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
@@ -3209,6 +3240,7 @@ impl Timeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(traversal_path);
}
if let Some(prev) = prev_lsn {
@@ -3582,6 +3614,26 @@ impl Timeline {
})
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
async fn lookup_cached_page(
&self,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
async fn get_ready_ancestor_timeline(
&self,
ancestor: &Arc<Timeline>,
@@ -5228,6 +5280,8 @@ impl Timeline {
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let last_rec_lsn = data.records.last().unwrap().0;
let img = match self
.walredo_mgr
.as_ref()
@@ -5241,6 +5295,23 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::WalRedo(e)),
};
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();
if let Err(e) = cache
.memorize_materialized_page(
self.tenant_shard_id,
self.timeline_id,
key,
last_rec_lsn,
&img,
)
.await
.context("Materialized page memoization failed")
{
return Err(PageReconstructError::from(e));
}
}
Ok(img)
}
}

View File

@@ -118,6 +118,8 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"libmetrics_launch_timestamp",
"libmetrics_build_info",
"libmetrics_tracing_event_count_total",
"pageserver_materialized_cache_hits_total",
"pageserver_materialized_cache_hits_direct_total",
"pageserver_page_cache_read_hits_total",
"pageserver_page_cache_read_accesses_total",
"pageserver_page_cache_size_current_bytes",

View File

@@ -3446,12 +3446,11 @@ class Endpoint(PgProtocol, LogUtils):
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
# Semaphore is set to 1 when we start, and acquire'd back to zero when we stop
#
# We use a semaphore rather than a bool so that racing calls to stop() don't
# try and stop the same process twice, as stop() is called by test teardown and
# potentially by some __del__ chains in other threads.
self._running = threading.Semaphore(0)
# This lock prevents concurrent start & stop operations, keeping `self.running` consistent
# with whether we're really running. Tests generally wouldn't try and do these concurrently,
# but endpoints are also stopped during test teardown, which might happen concurrently with
# destruction of objects in tests.
self.lock = threading.Lock()
def http_client(
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
@@ -3523,14 +3522,15 @@ class Endpoint(PgProtocol, LogUtils):
log.info(f"Starting postgres endpoint {self.endpoint_id}")
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
self._running.release(1)
with self.lock:
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
self.running = True
return self
@@ -3578,12 +3578,9 @@ class Endpoint(PgProtocol, LogUtils):
conf_file.write("\n".join(hba) + "\n")
conf_file.write(data)
if self.is_running():
if self.running:
self.safe_psql("SELECT pg_reload_conf()")
def is_running(self):
return self._running._value > 0
def reconfigure(self, pageserver_id: Optional[int] = None):
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
@@ -3632,12 +3629,13 @@ class Endpoint(PgProtocol, LogUtils):
Returns self.
"""
running = self._running.acquire(blocking=False)
if running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
with self.lock:
if self.running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
)
self.running = False
return self
@@ -3647,13 +3645,13 @@ class Endpoint(PgProtocol, LogUtils):
Returns self.
"""
running = self._running.acquire(blocking=False)
if running:
with self.lock:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
)
self.endpoint_id = None
self.running = False
return self

View File

@@ -1,5 +1,4 @@
import os
import queue
import random
import threading
import time
@@ -9,7 +8,11 @@ from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder, build_type: str):
if build_type == "debug":
# Disable vectored read path cross validation since it makes the test time out.
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
env = neon_env_builder.init_start()
cache_dir = os.path.join(env.repo_dir, "file_cache")
@@ -30,10 +33,11 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
cur = endpoint.connect().cursor()
stop = threading.Event()
n_rows = 100000
n_threads = 20
n_updates_per_thread = 10000
n_updates_per_connection = 1000
n_total_updates = n_threads * n_updates_per_thread
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
@@ -44,11 +48,11 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates(n_updates_performed_q: queue.Queue[int]):
def run_updates():
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
while not stop.is_set():
for _ in range(n_updates_per_thread):
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
@@ -57,28 +61,19 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
n_updates_performed_q.put(n_updates_performed)
n_updates_performed_q: queue.Queue[int] = queue.Queue()
threads: List[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
thread = threading.Thread(target=run_updates, args=(), daemon=True)
thread.start()
threads.append(thread)
time.sleep(5)
# unlink, this is what we're actually testing
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
os.rename(cache_dir, new_cache_dir)
time.sleep(10)
stop.set()
n_updates_performed = 0
for thread in threads:
thread.join()
n_updates_performed += n_updates_performed_q.get()
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows

View File

@@ -88,9 +88,6 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
initial_tenant_conf=tenant_conf, initial_tenant_shard_count=shard_count
)
for ps in env.pageservers:
ps.allowed_errors.append(".*page_service.*error obtaining lsn lease.*Tenant .* not found")
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()

View File

@@ -23,11 +23,11 @@ if TYPE_CHECKING:
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.timeout(600)
@pytest.mark.parametrize("shard_count", [None, 4])
def test_pg_regress(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
build_type: str,
pg_bin: PgBin,
capsys: CaptureFixture[str],
base_dir: Path,
@@ -43,6 +43,10 @@ def test_pg_regress(
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
if build_type == "debug":
# Disable vectored read path cross validation since it makes the test time out.
neon_env_builder.pageserver_config_override = "validate_vectored_get=false"
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)

View File

@@ -6,6 +6,7 @@ from fixtures.neon_fixtures import NeonEnv
def test_physical_replication(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 100000
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
@@ -21,20 +22,8 @@ def test_physical_replication(neon_simple_env: NeonEnv):
with p_con.cursor() as p_cur:
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
runtime_secs = 30
started_at = time.time()
pk = 0
while True:
pk += 1
now = time.time()
if now - started_at > runtime_secs:
break
for pk in range(n_records):
p_cur.execute("insert into t (pk) values (%s)", (pk,))
# an earlier version of this test was based on a fixed number of loop iterations
# and selected for pk=(random.randrange(1, fixed number of loop iterations)).
# => the probability of selection for a value that was never inserted changed from 99.9999% to 0% over the course of the test.
#
# We changed the test to where=(random.randrange(1, 2*pk)), which means the probability is now fixed to 50%.
s_cur.execute(
"select * from t where pk=%s", (random.randrange(1, 2 * pk),)
"select * from t where pk=%s", (random.randrange(1, n_records),)
)

View File

@@ -6,24 +6,18 @@ from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.utils import query_scalar
#
# Create read-only compute nodes, anchored at historical points in time.
#
# This is very similar to the 'test_branch_behind' test, but instead of
# creating branches, creates read-only nodes.
#
def test_readonly_node(neon_simple_env: NeonEnv):
"""
Create read-only compute nodes, anchored at historical points in time.
This is very similar to the 'test_branch_behind' test, but instead of
creating branches, creates read-only nodes.
"""
env = neon_simple_env
env.neon_cli.create_branch("test_readonly_node", "empty")
endpoint_main = env.endpoints.create_start("test_readonly_node")
env.pageserver.allowed_errors.extend(
[
".*basebackup .* failed: invalid basebackup lsn.*",
".*page_service.*error obtaining lsn lease.*.*tried to request a page version that was garbage collected",
]
)
env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*")
main_pg_conn = endpoint_main.connect()
main_cur = main_pg_conn.cursor()