mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-18 10:00:37 +00:00
Compare commits
1 Commits
hotfix_pub
...
seqscan-pe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6ec0456e8 |
@@ -1,31 +0,0 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-staging.local/management/api/v2"
|
||||
domain: "*.us-east-2.aws.neon.build"
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: dev
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.build
|
||||
|
||||
#metrics:
|
||||
# enabled: true
|
||||
# serviceMonitor:
|
||||
# enabled: true
|
||||
# selector:
|
||||
# release: kube-prometheus-stack
|
||||
28
.github/workflows/build_and_test.yml
vendored
28
.github/workflows/build_and_test.yml
vendored
@@ -825,31 +825,3 @@ jobs:
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
deploy-proxy-new:
|
||||
runs-on: dev
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
|
||||
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
aws --region us-east-2 eks update-kubeconfig --name dev-us-east-2-beta --role-arn arn:aws:iam::369495373322:role/github-runner
|
||||
|
||||
- name: Re-deploy proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
@@ -425,28 +425,7 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
|
||||
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
|
||||
// This is needed since postgres 15, where this privilege is removed by default.
|
||||
let grant_query = "DO $$\n\
|
||||
BEGIN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT nspname\n\
|
||||
FROM pg_catalog.pg_namespace\n\
|
||||
WHERE nspname = 'public'\n\
|
||||
) AND\n\
|
||||
version() LIKE 'PostgreSQL 15%'\n\
|
||||
THEN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT rolname\n\
|
||||
FROM pg_catalog.pg_roles\n\
|
||||
WHERE rolname = 'web_access'\n\
|
||||
)\n\
|
||||
THEN\n\
|
||||
GRANT CREATE ON SCHEMA public TO web_access;\n\
|
||||
END IF;\n\
|
||||
END IF;\n\
|
||||
END\n\
|
||||
$$;"
|
||||
.to_string();
|
||||
|
||||
let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string();
|
||||
info!("grant query for db {} : {}", &db.name, &grant_query);
|
||||
db_client.simple_query(&grant_query)?;
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
//! parent timeline, and the last LSN that has been written to disk.
|
||||
//!
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use tokio::sync::watch;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -25,6 +25,7 @@ use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
@@ -291,7 +292,7 @@ impl TimelineUninitMark {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
|
||||
fn delete_mark_file_if_present(&mut self) -> Result<(), anyhow::Error> {
|
||||
let uninit_mark_file = &self.uninit_mark_path;
|
||||
let uninit_mark_parent = uninit_mark_file
|
||||
.parent()
|
||||
@@ -469,7 +470,7 @@ impl Tenant {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
) -> Result<GcResult> {
|
||||
let timeline_str = target_timeline_id
|
||||
.map(|x| x.to_string())
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
@@ -485,7 +486,7 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub fn compaction_iteration(&self) -> anyhow::Result<()> {
|
||||
pub fn compaction_iteration(&self) -> Result<()> {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// compactions. We don't want to block everything else while the
|
||||
@@ -509,7 +510,7 @@ impl Tenant {
|
||||
///
|
||||
/// Used at graceful shutdown.
|
||||
///
|
||||
pub fn checkpoint(&self) -> anyhow::Result<()> {
|
||||
pub fn checkpoint(&self) -> Result<()> {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// checkpoints. We don't want to block everything else while the
|
||||
@@ -680,7 +681,7 @@ impl Tenant {
|
||||
/// before the children.
|
||||
fn tree_sort_timelines(
|
||||
timelines: HashMap<TimelineId, TimelineMetadata>,
|
||||
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
) -> Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
let mut result = Vec::with_capacity(timelines.len());
|
||||
|
||||
let mut now = Vec::with_capacity(timelines.len());
|
||||
@@ -783,6 +784,27 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.walreceiver_connect_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
|
||||
}
|
||||
|
||||
pub fn get_lagging_wal_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
|
||||
}
|
||||
|
||||
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||
}
|
||||
@@ -814,7 +836,7 @@ impl Tenant {
|
||||
))
|
||||
}
|
||||
|
||||
pub(super) fn new(
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
@@ -837,7 +859,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Locate and load config
|
||||
pub(super) fn load_tenant_config(
|
||||
pub fn load_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<TenantConfOpt> {
|
||||
@@ -879,7 +901,7 @@ impl Tenant {
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
|
||||
pub(super) fn persist_tenant_config(
|
||||
pub fn persist_tenant_config(
|
||||
target_config_path: &Path,
|
||||
tenant_conf: TenantConfOpt,
|
||||
first_save: bool,
|
||||
@@ -972,7 +994,7 @@ impl Tenant {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
) -> Result<GcResult> {
|
||||
let mut totals: GcResult = Default::default();
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -1389,7 +1411,7 @@ fn run_initdb(
|
||||
conf: &'static PageServerConf,
|
||||
initdb_target_dir: &Path,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<()> {
|
||||
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
|
||||
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
|
||||
info!(
|
||||
@@ -1435,7 +1457,7 @@ impl Drop for Tenant {
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
|
||||
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
|
||||
use std::os::unix::fs::FileExt;
|
||||
|
||||
// All layer files start with a two-byte "magic" value, to identify the kind of
|
||||
@@ -1540,13 +1562,13 @@ pub mod harness {
|
||||
}
|
||||
|
||||
impl<'a> TenantHarness<'a> {
|
||||
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
pub fn create(test_name: &'static str) -> Result<Self> {
|
||||
Self::create_internal(test_name, false)
|
||||
}
|
||||
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
pub fn create_exclusive(test_name: &'static str) -> Result<Self> {
|
||||
Self::create_internal(test_name, true)
|
||||
}
|
||||
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
|
||||
fn create_internal(test_name: &'static str, exclusive: bool) -> Result<Self> {
|
||||
let lock_guard = if exclusive {
|
||||
(None, Some(LOCK.write().unwrap()))
|
||||
} else {
|
||||
@@ -1580,7 +1602,7 @@ pub mod harness {
|
||||
self.try_load().expect("failed to load test tenant")
|
||||
}
|
||||
|
||||
pub fn try_load(&self) -> anyhow::Result<Tenant> {
|
||||
pub fn try_load(&self) -> Result<Tenant> {
|
||||
let walredo_mgr = Arc::new(TestRedoManager);
|
||||
|
||||
let tenant = Tenant::new(
|
||||
@@ -1660,7 +1682,7 @@ pub mod harness {
|
||||
},
|
||||
records.len()
|
||||
);
|
||||
println!("{s}");
|
||||
println!("{}", s);
|
||||
|
||||
Ok(TEST_IMG(&s))
|
||||
}
|
||||
@@ -1684,7 +1706,7 @@ mod tests {
|
||||
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
|
||||
|
||||
#[test]
|
||||
fn test_basic() -> anyhow::Result<()> {
|
||||
fn test_basic() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_basic")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1708,7 +1730,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_duplicate_timelines() -> anyhow::Result<()> {
|
||||
fn no_duplicate_timelines() -> Result<()> {
|
||||
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
|
||||
let _ = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1739,7 +1761,7 @@ mod tests {
|
||||
/// Test branch creation
|
||||
///
|
||||
#[test]
|
||||
fn test_branch() -> anyhow::Result<()> {
|
||||
fn test_branch() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_branch")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1792,7 +1814,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
|
||||
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> {
|
||||
let mut lsn = start_lsn;
|
||||
#[allow(non_snake_case)]
|
||||
{
|
||||
@@ -1834,7 +1856,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
|
||||
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load();
|
||||
@@ -1866,7 +1888,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
|
||||
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
|
||||
|
||||
@@ -1893,7 +1915,7 @@ mod tests {
|
||||
// FIXME: This currently fails to error out. Calling GC doesn't currently
|
||||
// remove the old value, we'd need to work a little harder
|
||||
#[test]
|
||||
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
|
||||
fn test_prohibit_get_for_garbage_collected_data() -> Result<()> {
|
||||
let repo =
|
||||
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
|
||||
.load();
|
||||
@@ -1913,7 +1935,7 @@ mod tests {
|
||||
*/
|
||||
|
||||
#[test]
|
||||
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
|
||||
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
|
||||
let tline = tenant
|
||||
@@ -1932,7 +1954,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
#[test]
|
||||
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
|
||||
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
|
||||
let tline = tenant
|
||||
@@ -1960,7 +1982,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timeline_load() -> anyhow::Result<()> {
|
||||
fn timeline_load() -> Result<()> {
|
||||
const TEST_NAME: &str = "timeline_load";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
{
|
||||
@@ -1981,7 +2003,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
|
||||
fn timeline_load_with_ancestor() -> Result<()> {
|
||||
const TEST_NAME: &str = "timeline_load_with_ancestor";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
// create two timelines
|
||||
@@ -2020,7 +2042,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_metadata() -> anyhow::Result<()> {
|
||||
fn corrupt_metadata() -> Result<()> {
|
||||
const TEST_NAME: &str = "corrupt_metadata";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let tenant = harness.load();
|
||||
@@ -2062,7 +2084,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_images() -> anyhow::Result<()> {
|
||||
fn test_images() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_images")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2114,7 +2136,7 @@ mod tests {
|
||||
// repeat 50 times.
|
||||
//
|
||||
#[test]
|
||||
fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
fn test_bulk_insert() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_bulk_insert")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2156,7 +2178,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_random_updates() -> anyhow::Result<()> {
|
||||
fn test_random_updates() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_random_updates")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2228,7 +2250,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_traverse_branches() -> anyhow::Result<()> {
|
||||
fn test_traverse_branches() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_traverse_branches")?.load();
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2309,7 +2331,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_traverse_ancestors() -> anyhow::Result<()> {
|
||||
fn test_traverse_ancestors() -> Result<()> {
|
||||
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//!
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
@@ -307,6 +307,10 @@ pub struct GcInfo {
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
//------------------------------------------------------------------------------
|
||||
// Public GET functions
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Get the LSN where this branch was created
|
||||
pub fn get_ancestor_lsn(&self) -> Lsn {
|
||||
self.ancestor_lsn
|
||||
@@ -441,7 +445,7 @@ impl Timeline {
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<()> {
|
||||
ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
|
||||
@@ -451,6 +455,12 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Public PUT functions, to update the repository with new page versions.
|
||||
//
|
||||
// These are called by the WAL receiver to digest WAL records.
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
@@ -469,91 +479,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compact(&self) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timelie was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
// currently in-use key space. The goal is to partition the
|
||||
// key space into roughly fixed-size chunks, but also take into
|
||||
// account any existing image layers, and try to align the
|
||||
// chunk boundaries with the existing image layers to avoid
|
||||
// too much churn. Also try to align chunk boundaries with
|
||||
// relation boundaries. In principle, we don't know about
|
||||
// relation boundaries here, we just deal with key-value
|
||||
// pairs, and the code in pgdatadir_mapping.rs knows how to
|
||||
// map relations into key-value pairs. But in practice we know
|
||||
// that 'field6' is the block number, and the fields 1-5
|
||||
// identify a relation. This is just an optimization,
|
||||
// though.
|
||||
//
|
||||
// 2. Once we know the partitioning, for each partition,
|
||||
// decide if it's time to create a new image layer. The
|
||||
// criteria is: there has been too much "churn" since the last
|
||||
// image layer? The "churn" is fuzzy concept, it's a
|
||||
// combination of too many delta files, or too much WAL in
|
||||
// total in the delta file. Or perhaps: if creating an image
|
||||
// file would allow to delete some older files.
|
||||
//
|
||||
// 3. After that, we compact all level0 delta files if there
|
||||
// are too many of them. While compacting, we also garbage
|
||||
// collect any page versions that are no longer needed because
|
||||
// of the new image layers we created in step 2.
|
||||
//
|
||||
// TODO: This high level strategy hasn't been implemented yet.
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
match self.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
) {
|
||||
Ok((partitioning, lsn)) => {
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
|
||||
if !layer_paths_to_upload.is_empty()
|
||||
&& self.upload_layers.load(atomic::Ordering::Relaxed)
|
||||
{
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
@@ -561,80 +486,6 @@ impl Timeline {
|
||||
_write_guard: self.write_lock.lock().unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
}
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a task to flush the frozen layer to disk, unless
|
||||
// a task was already running. (If the task was running
|
||||
// at the time that we froze the layer, it must've seen the
|
||||
// the layer we just froze before it exited; see comments
|
||||
// in flush_frozen_layers())
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush task",
|
||||
false,
|
||||
async move { self_clone.flush_frozen_layers(false) },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Private functions
|
||||
@@ -678,7 +529,7 @@ impl Timeline {
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn new(
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
metadata: TimelineMetadata,
|
||||
@@ -751,7 +602,7 @@ impl Timeline {
|
||||
result
|
||||
}
|
||||
|
||||
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
|
||||
pub fn launch_wal_receiver(self: &Arc<Self>) {
|
||||
if !is_etcd_client_initialized() {
|
||||
if cfg!(test) {
|
||||
info!("not launching WAL receiver because etcd client hasn't been initialized");
|
||||
@@ -790,7 +641,7 @@ impl Timeline {
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
pub fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -876,12 +727,30 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
|
||||
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
|
||||
self.layer_removal_cs
|
||||
.try_lock()
|
||||
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
}
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
|
||||
let timeline_id = self.timeline_id;
|
||||
|
||||
@@ -1102,7 +971,7 @@ impl Timeline {
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
fn get_ancestor_timeline(&self) -> Result<Arc<Timeline>> {
|
||||
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
|
||||
format!(
|
||||
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
|
||||
@@ -1161,14 +1030,14 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_tombstone(key_range, lsn)?;
|
||||
|
||||
@@ -1207,6 +1076,64 @@ impl Timeline {
|
||||
drop(layers);
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
///
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a task to flush the frozen layer to disk, unless
|
||||
// a task was already running. (If the task was running
|
||||
// at the time that we froze the layer, it must've seen the
|
||||
// the layer we just froze before it exited; see comments
|
||||
// in flush_frozen_layers())
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush task",
|
||||
false,
|
||||
async move { self_clone.flush_frozen_layers(false) },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush all frozen layers to disk.
|
||||
///
|
||||
/// Only one task at a time can be doing layer-flushing for a
|
||||
@@ -1214,7 +1141,7 @@ impl Timeline {
|
||||
/// currently doing the flushing, this function will wait for it
|
||||
/// to finish. If 'wait' is false, this function will return
|
||||
/// immediately instead.
|
||||
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
|
||||
fn flush_frozen_layers(&self, wait: bool) -> Result<()> {
|
||||
let flush_lock_guard = if wait {
|
||||
self.layer_flush_lock.lock().unwrap()
|
||||
} else {
|
||||
@@ -1253,7 +1180,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -1311,7 +1238,7 @@ impl Timeline {
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<()> {
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
@@ -1372,7 +1299,7 @@ impl Timeline {
|
||||
fn create_delta_layer(
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> anyhow::Result<(PathBuf, LayerFileMetadata)> {
|
||||
) -> Result<(PathBuf, LayerFileMetadata)> {
|
||||
// Write it out
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
@@ -1407,7 +1334,92 @@ impl Timeline {
|
||||
Ok((new_delta_path, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
pub fn compact(&self) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timelie was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
// currently in-use key space. The goal is to partition the
|
||||
// key space into roughly fixed-size chunks, but also take into
|
||||
// account any existing image layers, and try to align the
|
||||
// chunk boundaries with the existing image layers to avoid
|
||||
// too much churn. Also try to align chunk boundaries with
|
||||
// relation boundaries. In principle, we don't know about
|
||||
// relation boundaries here, we just deal with key-value
|
||||
// pairs, and the code in pgdatadir_mapping.rs knows how to
|
||||
// map relations into key-value pairs. But in practice we know
|
||||
// that 'field6' is the block number, and the fields 1-5
|
||||
// identify a relation. This is just an optimization,
|
||||
// though.
|
||||
//
|
||||
// 2. Once we know the partitioning, for each partition,
|
||||
// decide if it's time to create a new image layer. The
|
||||
// criteria is: there has been too much "churn" since the last
|
||||
// image layer? The "churn" is fuzzy concept, it's a
|
||||
// combination of too many delta files, or too much WAL in
|
||||
// total in the delta file. Or perhaps: if creating an image
|
||||
// file would allow to delete some older files.
|
||||
//
|
||||
// 3. After that, we compact all level0 delta files if there
|
||||
// are too many of them. While compacting, we also garbage
|
||||
// collect any page versions that are no longer needed because
|
||||
// of the new image layers we created in step 2.
|
||||
//
|
||||
// TODO: This high level strategy hasn't been implemented yet.
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
match self.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
) {
|
||||
Ok((partitioning, lsn)) => {
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
|
||||
if !layer_paths_to_upload.is_empty()
|
||||
&& self.upload_layers.load(atomic::Ordering::Relaxed)
|
||||
{
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
|
||||
let mut partitioning_guard = self.partitioning.lock().unwrap();
|
||||
if partitioning_guard.1 == Lsn(0)
|
||||
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
|
||||
@@ -1421,7 +1433,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<bool> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
@@ -1466,7 +1478,7 @@ impl Timeline {
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
) -> anyhow::Result<HashMap<PathBuf, LayerFileMetadata>> {
|
||||
) -> Result<HashMap<PathBuf, LayerFileMetadata>> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers: Vec<ImageLayer> = Vec::new();
|
||||
for partition in partitioning.parts.iter() {
|
||||
@@ -1559,7 +1571,7 @@ impl Timeline {
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
///
|
||||
fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
|
||||
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
@@ -1869,12 +1881,12 @@ impl Timeline {
|
||||
///
|
||||
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
|
||||
/// whether a record is needed for PITR.
|
||||
pub(super) fn update_gc_info(
|
||||
pub fn update_gc_info(
|
||||
&self,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
cutoff_horizon: Lsn,
|
||||
pitr: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<()> {
|
||||
let mut gc_info = self.gc_info.write().unwrap();
|
||||
|
||||
gc_info.horizon_cutoff = cutoff_horizon;
|
||||
@@ -1929,7 +1941,7 @@ impl Timeline {
|
||||
/// within a layer file. We can only remove the whole file if it's fully
|
||||
/// obsolete.
|
||||
///
|
||||
pub(super) fn gc(&self) -> anyhow::Result<GcResult> {
|
||||
pub fn gc(&self) -> Result<GcResult> {
|
||||
let mut result: GcResult = Default::default();
|
||||
let now = SystemTime::now();
|
||||
|
||||
@@ -2249,11 +2261,11 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
}
|
||||
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,9 @@ EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.0.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
|
||||
@@ -23,6 +23,11 @@ RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0)
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'neon_seqscan_rel'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
||||
|
||||
@@ -23,8 +23,13 @@
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/varlena.h"
|
||||
#include "utils/wait_event.h"
|
||||
#include "../neon/pagestore_client.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "libpq/libpq.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
extern void _PG_init(void);
|
||||
@@ -34,6 +39,7 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
PG_FUNCTION_INFO_V1(neon_seqscan_rel);
|
||||
|
||||
/*
|
||||
* Linkage to functions in neon module.
|
||||
@@ -289,6 +295,238 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||
*/
|
||||
static int
|
||||
call_PQgetCopyData(PGconn *conn, char **buffer)
|
||||
{
|
||||
int ret;
|
||||
|
||||
retry:
|
||||
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
|
||||
|
||||
if (ret == 0)
|
||||
{
|
||||
int wc;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(conn))
|
||||
elog(ERROR, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(conn));
|
||||
}
|
||||
|
||||
goto retry;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn);
|
||||
|
||||
/*
|
||||
* Fetch all pages of given relation. This simulates a sequential scan
|
||||
* over the table. You can specify the number of blocks to prefetch;
|
||||
* the function will try to keep that many requests "in flight" at all
|
||||
* times. The fetched pages are simply discarded.
|
||||
*/
|
||||
Datum
|
||||
neon_seqscan_rel(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relid = PG_GETARG_OID(0);
|
||||
Oid nprefetch = PG_GETARG_INT32(1);
|
||||
Relation rel;
|
||||
char *raw_page_data;
|
||||
BlockNumber nblocks;
|
||||
PGconn *pageserver_conn;
|
||||
XLogRecPtr read_lsn;
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to use raw page functions")));
|
||||
|
||||
rel = relation_open(relid, AccessShareLock);
|
||||
|
||||
nblocks = RelationGetNumberOfBlocks(rel);
|
||||
|
||||
pageserver_conn = PQconnectdb(page_server_connstring);
|
||||
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg("could not establish connection to pageserver"),
|
||||
errdetail_internal("%s", msg)));
|
||||
}
|
||||
PG_TRY();
|
||||
{
|
||||
char *query;
|
||||
int ret;
|
||||
StringInfoData resp_buff;
|
||||
|
||||
read_lsn = GetXLogInsertRecPtr();
|
||||
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
ret = PQsendQuery(pageserver_conn, query);
|
||||
if (ret != 1)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
elog(ERROR, "could not send pagestream command to pageserver");
|
||||
}
|
||||
|
||||
while (PQisBusy(pageserver_conn))
|
||||
{
|
||||
int wc;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(pageserver_conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
|
||||
elog(ERROR, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch);
|
||||
|
||||
BlockNumber nsent = 0;
|
||||
for (BlockNumber blkno = 0; blkno < nblocks; blkno++)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = true,
|
||||
.req.lsn = read_lsn,
|
||||
.rnode = rel->rd_node,
|
||||
.forknum = MAIN_FORKNUM,
|
||||
.blkno = blkno
|
||||
};
|
||||
NeonResponse *resp;
|
||||
|
||||
if (blkno % 1024 == 0)
|
||||
elog(INFO, "blk %u/%u", blkno, nblocks);
|
||||
|
||||
if (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||
{
|
||||
while (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||
send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn);
|
||||
|
||||
if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
elog(ERROR, "failed to flush page requests: %s", msg);
|
||||
}
|
||||
}
|
||||
|
||||
/* read response */
|
||||
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||
resp_buff.cursor = 0;
|
||||
|
||||
if (resp_buff.len < 0)
|
||||
{
|
||||
if (resp_buff.len == -1)
|
||||
elog(ERROR, "end of COPY");
|
||||
else if (resp_buff.len == -2)
|
||||
elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
}
|
||||
resp = nm_unpack_response(&resp_buff);
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
/* ok */
|
||||
break;
|
||||
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not read block %u", blkno),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
|
||||
PQfreemem(resp_buff.data);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
relation_close(rel, AccessShareLock);
|
||||
}
|
||||
|
||||
static void
|
||||
send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = true,
|
||||
.req.lsn = lsn,
|
||||
.rnode = rnode,
|
||||
.forknum = MAIN_FORKNUM,
|
||||
.blkno = blkno
|
||||
};
|
||||
StringInfoData req_buff;
|
||||
|
||||
req_buff = nm_pack_request(&request.req);
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output and
|
||||
* TCP buffer.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
elog(ERROR, "failed to send page request: %s", msg);
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
}
|
||||
|
||||
/*
|
||||
* Directly calls XLogFlush(lsn) to flush WAL buffers.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user