Compare commits

..

1 Commits

Author SHA1 Message Date
Heikki Linnakangas
d6ec0456e8 Add 'neon_seqscan_rel' test function, to test sequential scan performance.
Usage:

    postgres=# \timing
    Timing is on.
    postgres=# select neon_seqscan_rel('pgbench_accounts', 1000);
    INFO:  scanning 491804 blocks, prefetch 1000
    INFO:  blk 0/491804
    INFO:  blk 1024/491804
    INFO:  blk 2048/491804
    INFO:  blk 3072/491804
    ...
    INFO:  blk 489472/491804
    INFO:  blk 490496/491804
    INFO:  blk 491520/491804
     neon_seqscan_rel
    ------------------

    (1 row)

    Time: 57517.979 ms (00:57.518)

The second argument to the function is the number of pages to prefetch.
Note: the prefetching in this function works differently from the
prefetching we have for sequential scans in 'main'. After receiving the
result for a block, it immediately sends the request for the next page,
it doesn't send them in batches like 'main' does.
2022-10-21 13:30:35 +03:00
8 changed files with 496 additions and 296 deletions

View File

@@ -1,31 +0,0 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.build
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack

View File

@@ -825,31 +825,3 @@ jobs:
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
deploy-proxy-new:
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
defaults:
run:
shell: bash
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- name: Configure environment
run: |
helm repo add neondatabase https://neondatabase.github.io/helm-charts
aws --region us-east-2 eks update-kubeconfig --name dev-us-east-2-beta --role-arn arn:aws:iam::369495373322:role/github-runner
- name: Re-deploy proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s

View File

@@ -425,28 +425,7 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
// This is needed since postgres 15, where this privilege is removed by default.
let grant_query = "DO $$\n\
BEGIN\n\
IF EXISTS(\n\
SELECT nspname\n\
FROM pg_catalog.pg_namespace\n\
WHERE nspname = 'public'\n\
) AND\n\
version() LIKE 'PostgreSQL 15%'\n\
THEN\n\
IF EXISTS(\n\
SELECT rolname\n\
FROM pg_catalog.pg_roles\n\
WHERE rolname = 'web_access'\n\
)\n\
THEN\n\
GRANT CREATE ON SCHEMA public TO web_access;\n\
END IF;\n\
END IF;\n\
END\n\
$$;"
.to_string();
let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string();
info!("grant query for db {} : {}", &db.name, &grant_query);
db_client.simple_query(&grant_query)?;
}

View File

@@ -11,7 +11,7 @@
//! parent timeline, and the last LSN that has been written to disk.
//!
use anyhow::{bail, ensure, Context};
use anyhow::{bail, ensure, Context, Result};
use tokio::sync::watch;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -25,6 +25,7 @@ use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::Write;
use std::num::NonZeroU64;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
@@ -291,7 +292,7 @@ impl TimelineUninitMark {
Ok(())
}
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
fn delete_mark_file_if_present(&mut self) -> Result<(), anyhow::Error> {
let uninit_mark_file = &self.uninit_mark_path;
let uninit_mark_parent = uninit_mark_file
.parent()
@@ -469,7 +470,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> anyhow::Result<GcResult> {
) -> Result<GcResult> {
let timeline_str = target_timeline_id
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
@@ -485,7 +486,7 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub fn compaction_iteration(&self) -> anyhow::Result<()> {
pub fn compaction_iteration(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -509,7 +510,7 @@ impl Tenant {
///
/// Used at graceful shutdown.
///
pub fn checkpoint(&self) -> anyhow::Result<()> {
pub fn checkpoint(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
@@ -680,7 +681,7 @@ impl Tenant {
/// before the children.
fn tree_sort_timelines(
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
) -> Result<Vec<(TimelineId, TimelineMetadata)>> {
let mut result = Vec::with_capacity(timelines.len());
let mut now = Vec::with_capacity(timelines.len());
@@ -783,6 +784,27 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
}
pub fn get_lagging_wal_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
}
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
@@ -814,7 +836,7 @@ impl Tenant {
))
}
pub(super) fn new(
pub fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
@@ -837,7 +859,7 @@ impl Tenant {
}
/// Locate and load config
pub(super) fn load_tenant_config(
pub fn load_tenant_config(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<TenantConfOpt> {
@@ -879,7 +901,7 @@ impl Tenant {
Ok(tenant_conf)
}
pub(super) fn persist_tenant_config(
pub fn persist_tenant_config(
target_config_path: &Path,
tenant_conf: TenantConfOpt,
first_save: bool,
@@ -972,7 +994,7 @@ impl Tenant {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> anyhow::Result<GcResult> {
) -> Result<GcResult> {
let mut totals: GcResult = Default::default();
let now = Instant::now();
@@ -1389,7 +1411,7 @@ fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Path,
pg_version: u32,
) -> anyhow::Result<()> {
) -> Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
info!(
@@ -1435,7 +1457,7 @@ impl Drop for Tenant {
}
}
/// Dump contents of a layer file to stdout.
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
use std::os::unix::fs::FileExt;
// All layer files start with a two-byte "magic" value, to identify the kind of
@@ -1540,13 +1562,13 @@ pub mod harness {
}
impl<'a> TenantHarness<'a> {
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
pub fn create(test_name: &'static str) -> Result<Self> {
Self::create_internal(test_name, false)
}
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
pub fn create_exclusive(test_name: &'static str) -> Result<Self> {
Self::create_internal(test_name, true)
}
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
fn create_internal(test_name: &'static str, exclusive: bool) -> Result<Self> {
let lock_guard = if exclusive {
(None, Some(LOCK.write().unwrap()))
} else {
@@ -1580,7 +1602,7 @@ pub mod harness {
self.try_load().expect("failed to load test tenant")
}
pub fn try_load(&self) -> anyhow::Result<Tenant> {
pub fn try_load(&self) -> Result<Tenant> {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Tenant::new(
@@ -1660,7 +1682,7 @@ pub mod harness {
},
records.len()
);
println!("{s}");
println!("{}", s);
Ok(TEST_IMG(&s))
}
@@ -1684,7 +1706,7 @@ mod tests {
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
#[test]
fn test_basic() -> anyhow::Result<()> {
fn test_basic() -> Result<()> {
let tenant = TenantHarness::create("test_basic")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1708,7 +1730,7 @@ mod tests {
}
#[test]
fn no_duplicate_timelines() -> anyhow::Result<()> {
fn no_duplicate_timelines() -> Result<()> {
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
let _ = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1739,7 +1761,7 @@ mod tests {
/// Test branch creation
///
#[test]
fn test_branch() -> anyhow::Result<()> {
fn test_branch() -> Result<()> {
let tenant = TenantHarness::create("test_branch")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -1792,7 +1814,7 @@ mod tests {
Ok(())
}
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
@@ -1834,7 +1856,7 @@ mod tests {
}
#[test]
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
@@ -1866,7 +1888,7 @@ mod tests {
}
#[test]
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
@@ -1893,7 +1915,7 @@ mod tests {
// FIXME: This currently fails to error out. Calling GC doesn't currently
// remove the old value, we'd need to work a little harder
#[test]
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
fn test_prohibit_get_for_garbage_collected_data() -> Result<()> {
let repo =
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
@@ -1913,7 +1935,7 @@ mod tests {
*/
#[test]
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant
@@ -1932,7 +1954,7 @@ mod tests {
Ok(())
}
#[test]
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant
@@ -1960,7 +1982,7 @@ mod tests {
}
#[test]
fn timeline_load() -> anyhow::Result<()> {
fn timeline_load() -> Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
{
@@ -1981,7 +2003,7 @@ mod tests {
}
#[test]
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
fn timeline_load_with_ancestor() -> Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
// create two timelines
@@ -2020,7 +2042,7 @@ mod tests {
}
#[test]
fn corrupt_metadata() -> anyhow::Result<()> {
fn corrupt_metadata() -> Result<()> {
const TEST_NAME: &str = "corrupt_metadata";
let harness = TenantHarness::create(TEST_NAME)?;
let tenant = harness.load();
@@ -2062,7 +2084,7 @@ mod tests {
}
#[test]
fn test_images() -> anyhow::Result<()> {
fn test_images() -> Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2114,7 +2136,7 @@ mod tests {
// repeat 50 times.
//
#[test]
fn test_bulk_insert() -> anyhow::Result<()> {
fn test_bulk_insert() -> Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2156,7 +2178,7 @@ mod tests {
}
#[test]
fn test_random_updates() -> anyhow::Result<()> {
fn test_random_updates() -> Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2228,7 +2250,7 @@ mod tests {
}
#[test]
fn test_traverse_branches() -> anyhow::Result<()> {
fn test_traverse_branches() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2309,7 +2331,7 @@ mod tests {
}
#[test]
fn test_traverse_ancestors() -> anyhow::Result<()> {
fn test_traverse_ancestors() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?

View File

@@ -1,6 +1,6 @@
//!
use anyhow::{anyhow, bail, ensure, Context};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
@@ -307,6 +307,10 @@ pub struct GcInfo {
/// Public interface functions
impl Timeline {
//------------------------------------------------------------------------------
// Public GET functions
//------------------------------------------------------------------------------
/// Get the LSN where this branch was created
pub fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
@@ -441,7 +445,7 @@ impl Timeline {
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
) -> anyhow::Result<()> {
) -> Result<()> {
ensure!(
lsn >= **latest_gc_cutoff_lsn,
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
@@ -451,6 +455,12 @@ impl Timeline {
Ok(())
}
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
@@ -469,91 +479,6 @@ impl Timeline {
}
}
pub fn compact(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
}
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
// chunk boundaries with the existing image layers to avoid
// too much churn. Also try to align chunk boundaries with
// relation boundaries. In principle, we don't know about
// relation boundaries here, we just deal with key-value
// pairs, and the code in pgdatadir_mapping.rs knows how to
// map relations into key-value pairs. But in practice we know
// that 'field6' is the block number, and the fields 1-5
// identify a relation. This is just an optimization,
// though.
//
// 2. Once we know the partitioning, for each partition,
// decide if it's time to create a new image layer. The
// criteria is: there has been too much "churn" since the last
// image layer? The "churn" is fuzzy concept, it's a
// combination of too many delta files, or too much WAL in
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
match self.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
) {
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
layer_paths_to_upload,
None,
);
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
Ok(())
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
@@ -561,80 +486,6 @@ impl Timeline {
_write_guard: self.write_lock.lock().unwrap(),
}
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(init_lsn);
}
Ok(size)
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
}
}
// Private functions
@@ -678,7 +529,7 @@ impl Timeline {
///
/// Loads the metadata for the timeline into memory, but not the layer map.
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
pub fn new(
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
metadata: TimelineMetadata,
@@ -751,7 +602,7 @@ impl Timeline {
result
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
pub fn launch_wal_receiver(self: &Arc<Self>) {
if !is_etcd_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because etcd client hasn't been initialized");
@@ -790,7 +641,7 @@ impl Timeline {
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
pub fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
let mut num_layers = 0;
@@ -876,12 +727,30 @@ impl Timeline {
Ok(())
}
pub(super) fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
self.layer_removal_cs
.try_lock()
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
}
/// Retrieve current logical size of the timeline.
///
/// The size could be lagging behind the actual number, in case
/// the initial size calculation has not been run (gets triggered on the first size access).
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
(current_size, self.current_logical_size.initial_part_end)
{
self.try_spawn_size_init_task(init_lsn);
}
Ok(size)
}
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
let timeline_id = self.timeline_id;
@@ -1102,7 +971,7 @@ impl Timeline {
Some((lsn, img))
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
fn get_ancestor_timeline(&self) -> Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
@@ -1161,14 +1030,14 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
layer.put_value(key, lsn, val)?;
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
@@ -1207,6 +1076,64 @@ impl Timeline {
drop(layers);
}
///
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
///
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
}
/// Flush all frozen layers to disk.
///
/// Only one task at a time can be doing layer-flushing for a
@@ -1214,7 +1141,7 @@ impl Timeline {
/// currently doing the flushing, this function will wait for it
/// to finish. If 'wait' is false, this function will return
/// immediately instead.
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
fn flush_frozen_layers(&self, wait: bool) -> Result<()> {
let flush_lock_guard = if wait {
self.layer_flush_lock.lock().unwrap()
} else {
@@ -1253,7 +1180,7 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -1311,7 +1238,7 @@ impl Timeline {
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
) -> anyhow::Result<()> {
) -> Result<()> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
@@ -1372,7 +1299,7 @@ impl Timeline {
fn create_delta_layer(
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(PathBuf, LayerFileMetadata)> {
) -> Result<(PathBuf, LayerFileMetadata)> {
// Write it out
let new_delta = frozen_layer.write_to_disk()?;
let new_delta_path = new_delta.path();
@@ -1407,7 +1334,92 @@ impl Timeline {
Ok((new_delta_path, LayerFileMetadata::new(sz)))
}
fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> {
pub fn compact(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
}
//
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
// currently in-use key space. The goal is to partition the
// key space into roughly fixed-size chunks, but also take into
// account any existing image layers, and try to align the
// chunk boundaries with the existing image layers to avoid
// too much churn. Also try to align chunk boundaries with
// relation boundaries. In principle, we don't know about
// relation boundaries here, we just deal with key-value
// pairs, and the code in pgdatadir_mapping.rs knows how to
// map relations into key-value pairs. But in practice we know
// that 'field6' is the block number, and the fields 1-5
// identify a relation. This is just an optimization,
// though.
//
// 2. Once we know the partitioning, for each partition,
// decide if it's time to create a new image layer. The
// criteria is: there has been too much "churn" since the last
// image layer? The "churn" is fuzzy concept, it's a
// combination of too many delta files, or too much WAL in
// total in the delta file. Or perhaps: if creating an image
// file would allow to delete some older files.
//
// 3. After that, we compact all level0 delta files if there
// are too many of them. While compacting, we also garbage
// collect any page versions that are no longer needed because
// of the new image layers we created in step 2.
//
// TODO: This high level strategy hasn't been implemented yet.
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
match self.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
) {
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
layer_paths_to_upload,
None,
);
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
Ok(())
}
fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
let mut partitioning_guard = self.partitioning.lock().unwrap();
if partitioning_guard.1 == Lsn(0)
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
@@ -1421,7 +1433,7 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<bool> {
let layers = self.layers.read().unwrap();
for part_range in &partition.ranges {
@@ -1466,7 +1478,7 @@ impl Timeline {
partitioning: &KeyPartitioning,
lsn: Lsn,
force: bool,
) -> anyhow::Result<HashMap<PathBuf, LayerFileMetadata>> {
) -> Result<HashMap<PathBuf, LayerFileMetadata>> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers: Vec<ImageLayer> = Vec::new();
for partition in partitioning.parts.iter() {
@@ -1559,7 +1571,7 @@ impl Timeline {
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
///
fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
let layers = self.layers.read().unwrap();
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
@@ -1869,12 +1881,12 @@ impl Timeline {
///
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
/// whether a record is needed for PITR.
pub(super) fn update_gc_info(
pub fn update_gc_info(
&self,
retain_lsns: Vec<Lsn>,
cutoff_horizon: Lsn,
pitr: Duration,
) -> anyhow::Result<()> {
) -> Result<()> {
let mut gc_info = self.gc_info.write().unwrap();
gc_info.horizon_cutoff = cutoff_horizon;
@@ -1929,7 +1941,7 @@ impl Timeline {
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
pub(super) fn gc(&self) -> anyhow::Result<GcResult> {
pub fn gc(&self) -> Result<GcResult> {
let mut result: GcResult = Default::default();
let now = SystemTime::now();
@@ -2249,11 +2261,11 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
self.tl.put_value(key, lsn, value)
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
self.tl.put_tombstone(key_range, lsn)
}

View File

@@ -10,6 +10,9 @@ EXTENSION = neon_test_utils
DATA = neon_test_utils--1.0.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

View File

@@ -23,6 +23,11 @@ RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0)
RETURNS void
AS 'MODULE_PATHNAME', 'neon_seqscan_rel'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'

View File

@@ -23,8 +23,13 @@
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"
#include "../neon/pagestore_client.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
#include "libpq/libpq.h"
PG_MODULE_MAGIC;
extern void _PG_init(void);
@@ -34,6 +39,7 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
PG_FUNCTION_INFO_V1(neon_xlogflush);
PG_FUNCTION_INFO_V1(neon_seqscan_rel);
/*
* Linkage to functions in neon module.
@@ -289,6 +295,238 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
}
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(PGconn *conn, char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
elog(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
}
goto retry;
}
return ret;
}
static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn);
/*
* Fetch all pages of given relation. This simulates a sequential scan
* over the table. You can specify the number of blocks to prefetch;
* the function will try to keep that many requests "in flight" at all
* times. The fetched pages are simply discarded.
*/
Datum
neon_seqscan_rel(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
Oid nprefetch = PG_GETARG_INT32(1);
Relation rel;
char *raw_page_data;
BlockNumber nblocks;
PGconn *pageserver_conn;
XLogRecPtr read_lsn;
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to use raw page functions")));
rel = relation_open(relid, AccessShareLock);
nblocks = RelationGetNumberOfBlocks(rel);
pageserver_conn = PQconnectdb(page_server_connstring);
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg("could not establish connection to pageserver"),
errdetail_internal("%s", msg)));
}
PG_TRY();
{
char *query;
int ret;
StringInfoData resp_buff;
read_lsn = GetXLogInsertRecPtr();
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
ret = PQsendQuery(pageserver_conn, query);
if (ret != 1)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
elog(ERROR, "could not send pagestream command to pageserver");
}
while (PQisBusy(pageserver_conn))
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
PQfinish(pageserver_conn);
pageserver_conn = NULL;
elog(ERROR, "could not complete handshake with pageserver: %s",
msg);
}
}
}
elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch);
BlockNumber nsent = 0;
for (BlockNumber blkno = 0; blkno < nblocks; blkno++)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = true,
.req.lsn = read_lsn,
.rnode = rel->rd_node,
.forknum = MAIN_FORKNUM,
.blkno = blkno
};
NeonResponse *resp;
if (blkno % 1024 == 0)
elog(INFO, "blk %u/%u", blkno, nblocks);
if (nsent < blkno + nprefetch + 1 && nsent < nblocks)
{
while (nsent < blkno + nprefetch + 1 && nsent < nblocks)
send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn);
if (PQflush(pageserver_conn))
{
char *msg = PQerrorMessage(pageserver_conn);
elog(ERROR, "failed to flush page requests: %s", msg);
}
}
/* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len < 0)
{
if (resp_buff.len == -1)
elog(ERROR, "end of COPY");
else if (resp_buff.len == -2)
elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
}
resp = nm_unpack_response(&resp_buff);
switch (resp->tag)
{
case T_NeonGetPageResponse:
/* ok */
break;
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("could not read block %u", blkno),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
PQfreemem(resp_buff.data);
}
}
PG_CATCH();
{
PQfinish(pageserver_conn);
PG_RE_THROW();
}
PG_END_TRY();
relation_close(rel, AccessShareLock);
}
static void
send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = true,
.req.lsn = lsn,
.rnode = rnode,
.forknum = MAIN_FORKNUM,
.blkno = blkno
};
StringInfoData req_buff;
req_buff = nm_pack_request(&request.req);
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = PQerrorMessage(pageserver_conn);
elog(ERROR, "failed to send page request: %s", msg);
}
pfree(req_buff.data);
}
/*
* Directly calls XLogFlush(lsn) to flush WAL buffers.
*/