mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 23:30:38 +00:00
Compare commits
25 Commits
seqscan-pe
...
hotfix_pub
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26e1e430c0 | ||
|
|
f3aac81d19 | ||
|
|
2709878b8b | ||
|
|
39e4bdb99e | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
31
.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml
vendored
Normal file
31
.github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-staging.local/management/api/v2"
|
||||
domain: "*.us-east-2.aws.neon.build"
|
||||
|
||||
# -- Additional labels for neon-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy-scram
|
||||
zenith_env: dev
|
||||
zenith_region: us-east-2
|
||||
zenith_region_slug: us-east-2
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: us-east-2.aws.neon.build
|
||||
|
||||
#metrics:
|
||||
# enabled: true
|
||||
# serviceMonitor:
|
||||
# enabled: true
|
||||
# selector:
|
||||
# release: kube-prometheus-stack
|
||||
28
.github/workflows/build_and_test.yml
vendored
28
.github/workflows/build_and_test.yml
vendored
@@ -825,3 +825,31 @@ jobs:
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
deploy-proxy-new:
|
||||
runs-on: dev
|
||||
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned
|
||||
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.
|
||||
needs: [ push-docker-hub, calculate-deploy-targets, tag, regress-tests ]
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure environment
|
||||
run: |
|
||||
helm repo add neondatabase https://neondatabase.github.io/helm-charts
|
||||
aws --region us-east-2 eks update-kubeconfig --name dev-us-east-2-beta --role-arn arn:aws:iam::369495373322:role/github-runner
|
||||
|
||||
- name: Re-deploy proxy
|
||||
run: |
|
||||
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
|
||||
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/dev-us-east-2-beta.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
|
||||
|
||||
@@ -425,7 +425,28 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
|
||||
// Explicitly grant CREATE ON SCHEMA PUBLIC to the web_access user.
|
||||
// This is needed since postgres 15, where this privilege is removed by default.
|
||||
let grant_query: String = "GRANT CREATE ON SCHEMA public TO web_access".to_string();
|
||||
let grant_query = "DO $$\n\
|
||||
BEGIN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT nspname\n\
|
||||
FROM pg_catalog.pg_namespace\n\
|
||||
WHERE nspname = 'public'\n\
|
||||
) AND\n\
|
||||
version() LIKE 'PostgreSQL 15%'\n\
|
||||
THEN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT rolname\n\
|
||||
FROM pg_catalog.pg_roles\n\
|
||||
WHERE rolname = 'web_access'\n\
|
||||
)\n\
|
||||
THEN\n\
|
||||
GRANT CREATE ON SCHEMA public TO web_access;\n\
|
||||
END IF;\n\
|
||||
END IF;\n\
|
||||
END\n\
|
||||
$$;"
|
||||
.to_string();
|
||||
|
||||
info!("grant query for db {} : {}", &db.name, &grant_query);
|
||||
db_client.simple_query(&grant_query)?;
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
//! parent timeline, and the last LSN that has been written to disk.
|
||||
//!
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use tokio::sync::watch;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -25,7 +25,6 @@ use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
@@ -292,7 +291,7 @@ impl TimelineUninitMark {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_mark_file_if_present(&mut self) -> Result<(), anyhow::Error> {
|
||||
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
|
||||
let uninit_mark_file = &self.uninit_mark_path;
|
||||
let uninit_mark_parent = uninit_mark_file
|
||||
.parent()
|
||||
@@ -470,7 +469,7 @@ impl Tenant {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let timeline_str = target_timeline_id
|
||||
.map(|x| x.to_string())
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
@@ -486,7 +485,7 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub fn compaction_iteration(&self) -> Result<()> {
|
||||
pub fn compaction_iteration(&self) -> anyhow::Result<()> {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// compactions. We don't want to block everything else while the
|
||||
@@ -510,7 +509,7 @@ impl Tenant {
|
||||
///
|
||||
/// Used at graceful shutdown.
|
||||
///
|
||||
pub fn checkpoint(&self) -> Result<()> {
|
||||
pub fn checkpoint(&self) -> anyhow::Result<()> {
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// checkpoints. We don't want to block everything else while the
|
||||
@@ -681,7 +680,7 @@ impl Tenant {
|
||||
/// before the children.
|
||||
fn tree_sort_timelines(
|
||||
timelines: HashMap<TimelineId, TimelineMetadata>,
|
||||
) -> Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
let mut result = Vec::with_capacity(timelines.len());
|
||||
|
||||
let mut now = Vec::with_capacity(timelines.len());
|
||||
@@ -784,27 +783,6 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
pub fn get_wal_receiver_connect_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.walreceiver_connect_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout)
|
||||
}
|
||||
|
||||
pub fn get_lagging_wal_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout)
|
||||
}
|
||||
|
||||
pub fn get_max_lsn_wal_lag(&self) -> NonZeroU64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||
}
|
||||
@@ -836,7 +814,7 @@ impl Tenant {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
@@ -859,7 +837,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Locate and load config
|
||||
pub fn load_tenant_config(
|
||||
pub(super) fn load_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<TenantConfOpt> {
|
||||
@@ -901,7 +879,7 @@ impl Tenant {
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
|
||||
pub fn persist_tenant_config(
|
||||
pub(super) fn persist_tenant_config(
|
||||
target_config_path: &Path,
|
||||
tenant_conf: TenantConfOpt,
|
||||
first_save: bool,
|
||||
@@ -994,7 +972,7 @@ impl Tenant {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let mut totals: GcResult = Default::default();
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -1411,7 +1389,7 @@ fn run_initdb(
|
||||
conf: &'static PageServerConf,
|
||||
initdb_target_dir: &Path,
|
||||
pg_version: u32,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
|
||||
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
|
||||
info!(
|
||||
@@ -1457,7 +1435,7 @@ impl Drop for Tenant {
|
||||
}
|
||||
}
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
|
||||
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> anyhow::Result<()> {
|
||||
use std::os::unix::fs::FileExt;
|
||||
|
||||
// All layer files start with a two-byte "magic" value, to identify the kind of
|
||||
@@ -1562,13 +1540,13 @@ pub mod harness {
|
||||
}
|
||||
|
||||
impl<'a> TenantHarness<'a> {
|
||||
pub fn create(test_name: &'static str) -> Result<Self> {
|
||||
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
Self::create_internal(test_name, false)
|
||||
}
|
||||
pub fn create_exclusive(test_name: &'static str) -> Result<Self> {
|
||||
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
Self::create_internal(test_name, true)
|
||||
}
|
||||
fn create_internal(test_name: &'static str, exclusive: bool) -> Result<Self> {
|
||||
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
|
||||
let lock_guard = if exclusive {
|
||||
(None, Some(LOCK.write().unwrap()))
|
||||
} else {
|
||||
@@ -1602,7 +1580,7 @@ pub mod harness {
|
||||
self.try_load().expect("failed to load test tenant")
|
||||
}
|
||||
|
||||
pub fn try_load(&self) -> Result<Tenant> {
|
||||
pub fn try_load(&self) -> anyhow::Result<Tenant> {
|
||||
let walredo_mgr = Arc::new(TestRedoManager);
|
||||
|
||||
let tenant = Tenant::new(
|
||||
@@ -1682,7 +1660,7 @@ pub mod harness {
|
||||
},
|
||||
records.len()
|
||||
);
|
||||
println!("{}", s);
|
||||
println!("{s}");
|
||||
|
||||
Ok(TEST_IMG(&s))
|
||||
}
|
||||
@@ -1706,7 +1684,7 @@ mod tests {
|
||||
Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
|
||||
|
||||
#[test]
|
||||
fn test_basic() -> Result<()> {
|
||||
fn test_basic() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_basic")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1730,7 +1708,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_duplicate_timelines() -> Result<()> {
|
||||
fn no_duplicate_timelines() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
|
||||
let _ = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1761,7 +1739,7 @@ mod tests {
|
||||
/// Test branch creation
|
||||
///
|
||||
#[test]
|
||||
fn test_branch() -> Result<()> {
|
||||
fn test_branch() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_branch")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -1814,7 +1792,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> {
|
||||
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut lsn = start_lsn;
|
||||
#[allow(non_snake_case)]
|
||||
{
|
||||
@@ -1856,7 +1834,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
|
||||
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load();
|
||||
@@ -1888,7 +1866,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
|
||||
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
|
||||
|
||||
@@ -1915,7 +1893,7 @@ mod tests {
|
||||
// FIXME: This currently fails to error out. Calling GC doesn't currently
|
||||
// remove the old value, we'd need to work a little harder
|
||||
#[test]
|
||||
fn test_prohibit_get_for_garbage_collected_data() -> Result<()> {
|
||||
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let repo =
|
||||
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
|
||||
.load();
|
||||
@@ -1935,7 +1913,7 @@ mod tests {
|
||||
*/
|
||||
|
||||
#[test]
|
||||
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
|
||||
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
|
||||
let tline = tenant
|
||||
@@ -1954,7 +1932,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
#[test]
|
||||
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
|
||||
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
|
||||
let tenant =
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
|
||||
let tline = tenant
|
||||
@@ -1982,7 +1960,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timeline_load() -> Result<()> {
|
||||
fn timeline_load() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "timeline_load";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
{
|
||||
@@ -2003,7 +1981,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timeline_load_with_ancestor() -> Result<()> {
|
||||
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "timeline_load_with_ancestor";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
// create two timelines
|
||||
@@ -2042,7 +2020,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_metadata() -> Result<()> {
|
||||
fn corrupt_metadata() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "corrupt_metadata";
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let tenant = harness.load();
|
||||
@@ -2084,7 +2062,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_images() -> Result<()> {
|
||||
fn test_images() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_images")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2136,7 +2114,7 @@ mod tests {
|
||||
// repeat 50 times.
|
||||
//
|
||||
#[test]
|
||||
fn test_bulk_insert() -> Result<()> {
|
||||
fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_bulk_insert")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2178,7 +2156,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_random_updates() -> Result<()> {
|
||||
fn test_random_updates() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_random_updates")?.load();
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2250,7 +2228,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_traverse_branches() -> Result<()> {
|
||||
fn test_traverse_branches() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_traverse_branches")?.load();
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
@@ -2331,7 +2309,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_traverse_ancestors() -> Result<()> {
|
||||
fn test_traverse_ancestors() -> anyhow::Result<()> {
|
||||
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//!
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
@@ -307,10 +307,6 @@ pub struct GcInfo {
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
//------------------------------------------------------------------------------
|
||||
// Public GET functions
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Get the LSN where this branch was created
|
||||
pub fn get_ancestor_lsn(&self) -> Lsn {
|
||||
self.ancestor_lsn
|
||||
@@ -445,7 +441,7 @@ impl Timeline {
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
|
||||
@@ -455,12 +451,6 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Public PUT functions, to update the repository with new page versions.
|
||||
//
|
||||
// These are called by the WAL receiver to digest WAL records.
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
@@ -479,6 +469,91 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compact(&self) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timelie was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
// currently in-use key space. The goal is to partition the
|
||||
// key space into roughly fixed-size chunks, but also take into
|
||||
// account any existing image layers, and try to align the
|
||||
// chunk boundaries with the existing image layers to avoid
|
||||
// too much churn. Also try to align chunk boundaries with
|
||||
// relation boundaries. In principle, we don't know about
|
||||
// relation boundaries here, we just deal with key-value
|
||||
// pairs, and the code in pgdatadir_mapping.rs knows how to
|
||||
// map relations into key-value pairs. But in practice we know
|
||||
// that 'field6' is the block number, and the fields 1-5
|
||||
// identify a relation. This is just an optimization,
|
||||
// though.
|
||||
//
|
||||
// 2. Once we know the partitioning, for each partition,
|
||||
// decide if it's time to create a new image layer. The
|
||||
// criteria is: there has been too much "churn" since the last
|
||||
// image layer? The "churn" is fuzzy concept, it's a
|
||||
// combination of too many delta files, or too much WAL in
|
||||
// total in the delta file. Or perhaps: if creating an image
|
||||
// file would allow to delete some older files.
|
||||
//
|
||||
// 3. After that, we compact all level0 delta files if there
|
||||
// are too many of them. While compacting, we also garbage
|
||||
// collect any page versions that are no longer needed because
|
||||
// of the new image layers we created in step 2.
|
||||
//
|
||||
// TODO: This high level strategy hasn't been implemented yet.
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
match self.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
) {
|
||||
Ok((partitioning, lsn)) => {
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
|
||||
if !layer_paths_to_upload.is_empty()
|
||||
&& self.upload_layers.load(atomic::Ordering::Relaxed)
|
||||
{
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
@@ -486,6 +561,80 @@ impl Timeline {
|
||||
_write_guard: self.write_lock.lock().unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
}
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a task to flush the frozen layer to disk, unless
|
||||
// a task was already running. (If the task was running
|
||||
// at the time that we froze the layer, it must've seen the
|
||||
// the layer we just froze before it exited; see comments
|
||||
// in flush_frozen_layers())
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush task",
|
||||
false,
|
||||
async move { self_clone.flush_frozen_layers(false) },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Private functions
|
||||
@@ -529,7 +678,7 @@ impl Timeline {
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
metadata: TimelineMetadata,
|
||||
@@ -602,7 +751,7 @@ impl Timeline {
|
||||
result
|
||||
}
|
||||
|
||||
pub fn launch_wal_receiver(self: &Arc<Self>) {
|
||||
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
|
||||
if !is_etcd_client_initialized() {
|
||||
if cfg!(test) {
|
||||
info!("not launching WAL receiver because etcd client hasn't been initialized");
|
||||
@@ -641,7 +790,7 @@ impl Timeline {
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -727,30 +876,12 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
|
||||
pub(super) fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
|
||||
self.layer_removal_cs
|
||||
.try_lock()
|
||||
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
|
||||
}
|
||||
|
||||
/// Retrieve current logical size of the timeline.
|
||||
///
|
||||
/// The size could be lagging behind the actual number, in case
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
let size = current_size.size();
|
||||
if let (CurrentLogicalSize::Approximate(_), Some(init_lsn)) =
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
}
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
|
||||
let timeline_id = self.timeline_id;
|
||||
|
||||
@@ -971,7 +1102,7 @@ impl Timeline {
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> Result<Arc<Timeline>> {
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
|
||||
format!(
|
||||
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
|
||||
@@ -1030,14 +1161,14 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_tombstone(key_range, lsn)?;
|
||||
|
||||
@@ -1076,64 +1207,6 @@ impl Timeline {
|
||||
drop(layers);
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
|
||||
/// the in-memory layer, and initiate flushing it if so.
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
///
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
let last_freeze_at = self.last_freeze_at.load();
|
||||
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
|
||||
let distance = last_lsn.widening_sub(last_freeze_at);
|
||||
// Checkpointing the open layer can be triggered by layer size or LSN range.
|
||||
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
|
||||
// we want to stay below that with a big margin. The LSN distance determines how
|
||||
// much WAL the safekeepers need to store.
|
||||
if distance >= self.get_checkpoint_distance().into()
|
||||
|| open_layer_size > self.get_checkpoint_distance()
|
||||
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
|
||||
{
|
||||
info!(
|
||||
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
|
||||
distance,
|
||||
open_layer_size,
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
// Launch a task to flush the frozen layer to disk, unless
|
||||
// a task was already running. (If the task was running
|
||||
// at the time that we froze the layer, it must've seen the
|
||||
// the layer we just froze before it exited; see comments
|
||||
// in flush_frozen_layers())
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush task",
|
||||
false,
|
||||
async move { self_clone.flush_frozen_layers(false) },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush all frozen layers to disk.
|
||||
///
|
||||
/// Only one task at a time can be doing layer-flushing for a
|
||||
@@ -1141,7 +1214,7 @@ impl Timeline {
|
||||
/// currently doing the flushing, this function will wait for it
|
||||
/// to finish. If 'wait' is false, this function will return
|
||||
/// immediately instead.
|
||||
fn flush_frozen_layers(&self, wait: bool) -> Result<()> {
|
||||
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
|
||||
let flush_lock_guard = if wait {
|
||||
self.layer_flush_lock.lock().unwrap()
|
||||
} else {
|
||||
@@ -1180,7 +1253,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -1238,7 +1311,7 @@ impl Timeline {
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
@@ -1299,7 +1372,7 @@ impl Timeline {
|
||||
fn create_delta_layer(
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> Result<(PathBuf, LayerFileMetadata)> {
|
||||
) -> anyhow::Result<(PathBuf, LayerFileMetadata)> {
|
||||
// Write it out
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
@@ -1334,92 +1407,7 @@ impl Timeline {
|
||||
Ok((new_delta_path, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
pub fn compact(&self) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timelie was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
// currently in-use key space. The goal is to partition the
|
||||
// key space into roughly fixed-size chunks, but also take into
|
||||
// account any existing image layers, and try to align the
|
||||
// chunk boundaries with the existing image layers to avoid
|
||||
// too much churn. Also try to align chunk boundaries with
|
||||
// relation boundaries. In principle, we don't know about
|
||||
// relation boundaries here, we just deal with key-value
|
||||
// pairs, and the code in pgdatadir_mapping.rs knows how to
|
||||
// map relations into key-value pairs. But in practice we know
|
||||
// that 'field6' is the block number, and the fields 1-5
|
||||
// identify a relation. This is just an optimization,
|
||||
// though.
|
||||
//
|
||||
// 2. Once we know the partitioning, for each partition,
|
||||
// decide if it's time to create a new image layer. The
|
||||
// criteria is: there has been too much "churn" since the last
|
||||
// image layer? The "churn" is fuzzy concept, it's a
|
||||
// combination of too many delta files, or too much WAL in
|
||||
// total in the delta file. Or perhaps: if creating an image
|
||||
// file would allow to delete some older files.
|
||||
//
|
||||
// 3. After that, we compact all level0 delta files if there
|
||||
// are too many of them. While compacting, we also garbage
|
||||
// collect any page versions that are no longer needed because
|
||||
// of the new image layers we created in step 2.
|
||||
//
|
||||
// TODO: This high level strategy hasn't been implemented yet.
|
||||
// Below are functions compact_level0() and create_image_layers()
|
||||
// but they are a bit ad hoc and don't quite work like it's explained
|
||||
// above. Rewrite it.
|
||||
let _layer_removal_cs = self.layer_removal_cs.lock().unwrap();
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
match self.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
) {
|
||||
Ok((partitioning, lsn)) => {
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
|
||||
if !layer_paths_to_upload.is_empty()
|
||||
&& self.upload_layers.load(atomic::Ordering::Relaxed)
|
||||
{
|
||||
storage_sync::schedule_layer_upload(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_paths_to_upload,
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size)?;
|
||||
timer.stop_and_record();
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
|
||||
fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
let mut partitioning_guard = self.partitioning.lock().unwrap();
|
||||
if partitioning_guard.1 == Lsn(0)
|
||||
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
|
||||
@@ -1433,7 +1421,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<bool> {
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
@@ -1478,7 +1466,7 @@ impl Timeline {
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
) -> Result<HashMap<PathBuf, LayerFileMetadata>> {
|
||||
) -> anyhow::Result<HashMap<PathBuf, LayerFileMetadata>> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers: Vec<ImageLayer> = Vec::new();
|
||||
for partition in partitioning.parts.iter() {
|
||||
@@ -1571,7 +1559,7 @@ impl Timeline {
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
///
|
||||
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
|
||||
fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
@@ -1881,12 +1869,12 @@ impl Timeline {
|
||||
///
|
||||
/// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
|
||||
/// whether a record is needed for PITR.
|
||||
pub fn update_gc_info(
|
||||
pub(super) fn update_gc_info(
|
||||
&self,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
cutoff_horizon: Lsn,
|
||||
pitr: Duration,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
let mut gc_info = self.gc_info.write().unwrap();
|
||||
|
||||
gc_info.horizon_cutoff = cutoff_horizon;
|
||||
@@ -1941,7 +1929,7 @@ impl Timeline {
|
||||
/// within a layer file. We can only remove the whole file if it's fully
|
||||
/// obsolete.
|
||||
///
|
||||
pub fn gc(&self) -> Result<GcResult> {
|
||||
pub(super) fn gc(&self) -> anyhow::Result<GcResult> {
|
||||
let mut result: GcResult = Default::default();
|
||||
let now = SystemTime::now();
|
||||
|
||||
@@ -2261,11 +2249,11 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
}
|
||||
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()> {
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,9 +10,6 @@ EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.0.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
|
||||
@@ -23,11 +23,6 @@ RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_seqscan_rel(rel regclass, nprefetch int DEFAULT 0)
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'neon_seqscan_rel'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'neon_xlogflush'
|
||||
|
||||
@@ -23,13 +23,8 @@
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/varlena.h"
|
||||
#include "utils/wait_event.h"
|
||||
#include "../neon/pagestore_client.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "libpq/libpq.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
extern void _PG_init(void);
|
||||
@@ -39,7 +34,6 @@ PG_FUNCTION_INFO_V1(clear_buffer_cache);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
PG_FUNCTION_INFO_V1(neon_seqscan_rel);
|
||||
|
||||
/*
|
||||
* Linkage to functions in neon module.
|
||||
@@ -295,238 +289,6 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||
*/
|
||||
static int
|
||||
call_PQgetCopyData(PGconn *conn, char **buffer)
|
||||
{
|
||||
int ret;
|
||||
|
||||
retry:
|
||||
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
|
||||
|
||||
if (ret == 0)
|
||||
{
|
||||
int wc;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(conn))
|
||||
elog(ERROR, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(conn));
|
||||
}
|
||||
|
||||
goto retry;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn);
|
||||
|
||||
/*
|
||||
* Fetch all pages of given relation. This simulates a sequential scan
|
||||
* over the table. You can specify the number of blocks to prefetch;
|
||||
* the function will try to keep that many requests "in flight" at all
|
||||
* times. The fetched pages are simply discarded.
|
||||
*/
|
||||
Datum
|
||||
neon_seqscan_rel(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relid = PG_GETARG_OID(0);
|
||||
Oid nprefetch = PG_GETARG_INT32(1);
|
||||
Relation rel;
|
||||
char *raw_page_data;
|
||||
BlockNumber nblocks;
|
||||
PGconn *pageserver_conn;
|
||||
XLogRecPtr read_lsn;
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to use raw page functions")));
|
||||
|
||||
rel = relation_open(relid, AccessShareLock);
|
||||
|
||||
nblocks = RelationGetNumberOfBlocks(rel);
|
||||
|
||||
pageserver_conn = PQconnectdb(page_server_connstring);
|
||||
if (PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg("could not establish connection to pageserver"),
|
||||
errdetail_internal("%s", msg)));
|
||||
}
|
||||
PG_TRY();
|
||||
{
|
||||
char *query;
|
||||
int ret;
|
||||
StringInfoData resp_buff;
|
||||
|
||||
read_lsn = GetXLogInsertRecPtr();
|
||||
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
ret = PQsendQuery(pageserver_conn, query);
|
||||
if (ret != 1)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
elog(ERROR, "could not send pagestream command to pageserver");
|
||||
}
|
||||
|
||||
while (PQisBusy(pageserver_conn))
|
||||
{
|
||||
int wc;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(pageserver_conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
|
||||
elog(ERROR, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
elog(INFO, "scanning %u blocks, prefetch %u", nblocks, nprefetch);
|
||||
|
||||
BlockNumber nsent = 0;
|
||||
for (BlockNumber blkno = 0; blkno < nblocks; blkno++)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = true,
|
||||
.req.lsn = read_lsn,
|
||||
.rnode = rel->rd_node,
|
||||
.forknum = MAIN_FORKNUM,
|
||||
.blkno = blkno
|
||||
};
|
||||
NeonResponse *resp;
|
||||
|
||||
if (blkno % 1024 == 0)
|
||||
elog(INFO, "blk %u/%u", blkno, nblocks);
|
||||
|
||||
if (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||
{
|
||||
while (nsent < blkno + nprefetch + 1 && nsent < nblocks)
|
||||
send_getpage_request(pageserver_conn, rel->rd_node, nsent++, read_lsn);
|
||||
|
||||
if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
elog(ERROR, "failed to flush page requests: %s", msg);
|
||||
}
|
||||
}
|
||||
|
||||
/* read response */
|
||||
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||
resp_buff.cursor = 0;
|
||||
|
||||
if (resp_buff.len < 0)
|
||||
{
|
||||
if (resp_buff.len == -1)
|
||||
elog(ERROR, "end of COPY");
|
||||
else if (resp_buff.len == -2)
|
||||
elog(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
}
|
||||
resp = nm_unpack_response(&resp_buff);
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
/* ok */
|
||||
break;
|
||||
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not read block %u", blkno),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
|
||||
PQfreemem(resp_buff.data);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
relation_close(rel, AccessShareLock);
|
||||
}
|
||||
|
||||
static void
|
||||
send_getpage_request(PGconn *pageserver_conn, RelFileNode rnode, BlockNumber blkno, XLogRecPtr lsn)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = true,
|
||||
.req.lsn = lsn,
|
||||
.rnode = rnode,
|
||||
.forknum = MAIN_FORKNUM,
|
||||
.blkno = blkno
|
||||
};
|
||||
StringInfoData req_buff;
|
||||
|
||||
req_buff = nm_pack_request(&request.req);
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output and
|
||||
* TCP buffer.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
elog(ERROR, "failed to send page request: %s", msg);
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
}
|
||||
|
||||
/*
|
||||
* Directly calls XLogFlush(lsn) to flush WAL buffers.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user