Compare commits

..

6 Commits

19 changed files with 136 additions and 499 deletions

View File

@@ -1,17 +1,3 @@
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
#
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
#
[profile.dev.package."*"]
# Set the default for dependencies in Development mode.
opt-level = 3
[profile.dev]
# Turn on a small amount of optimization in Development mode.
opt-level = 1
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.

View File

@@ -852,7 +852,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.19.0
VM_BUILDER_VERSION: v0.18.5
steps:
- name: Checkout
@@ -874,7 +874,8 @@ jobs:
- name: Build vm image
run: |
./vm-builder \
-spec=vm-image-spec.yaml \
-enable-file-cache \
-cgroup-uid=postgres \
-src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
-dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}

10
Cargo.lock generated
View File

@@ -3221,7 +3221,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"bytes",
"fallible-iterator",
@@ -3234,7 +3234,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"native-tls",
"tokio",
@@ -3245,7 +3245,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -3263,7 +3263,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4933,7 +4933,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -165,11 +165,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -206,7 +206,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
################# Binary contents sections

View File

@@ -1,4 +1,7 @@
use crate::repository::{key_range_size, singleton_range, Key};
use crate::{
pgdatadir_mapping::{BASEBACKUP_CUT, METADATA_CUT},
repository::{key_range_size, singleton_range, Key},
};
use postgres_ffi::BLCKSZ;
use std::ops::Range;
@@ -22,13 +25,22 @@ impl KeySpace {
let target_nblocks = (target_size / BLCKSZ as u64) as usize;
let mut parts = Vec::new();
let mut current_part = Vec::new();
let mut current_part: Vec<Range<Key>> = Vec::new();
let mut current_part_size: usize = 0;
for range in &self.ranges {
let last = current_part
.last()
.map(|r| r.end)
.unwrap_or(Key::from_i128(0));
let cut_here = (range.start >= METADATA_CUT && last < METADATA_CUT)
|| (range.start >= BASEBACKUP_CUT && last < BASEBACKUP_CUT);
// If appending the next contiguous range in the keyspace to the current
// partition would cause it to be too large, start a new partition.
let this_size = key_range_size(range) as usize;
if current_part_size + this_size > target_nblocks && !current_part.is_empty() {
if cut_here
|| current_part_size + this_size > target_nblocks && !current_part.is_empty()
{
parts.push(KeySpace {
ranges: current_part,
});

View File

@@ -40,9 +40,6 @@ pub enum StorageTimeOperation {
#[strum(serialize = "logical size")]
LogicalSize,
#[strum(serialize = "imitate logical size")]
ImitateLogicalSize,
#[strum(serialize = "load layer map")]
LoadLayerMap,
@@ -1364,7 +1361,6 @@ pub struct TimelineMetrics {
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
pub imitate_logical_size_histo: StorageTimeMetrics,
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
@@ -1393,11 +1389,6 @@ impl TimelineMetrics {
StorageTimeMetrics::new(StorageTimeOperation::CreateImages, &tenant_id, &timeline_id);
let logical_size_histo =
StorageTimeMetrics::new(StorageTimeOperation::LogicalSize, &tenant_id, &timeline_id);
let imitate_logical_size_histo = StorageTimeMetrics::new(
StorageTimeOperation::ImitateLogicalSize,
&tenant_id,
&timeline_id,
);
let load_layer_map_histo =
StorageTimeMetrics::new(StorageTimeOperation::LoadLayerMap, &tenant_id, &timeline_id);
let garbage_collect_histo =
@@ -1430,7 +1421,6 @@ impl TimelineMetrics {
compact_time_histo,
create_images_time_histo,
logical_size_histo,
imitate_logical_size_histo,
garbage_collect_histo,
load_layer_map_histo,
last_record_gauge,

View File

@@ -662,20 +662,21 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<KeySpace, CollectKeySpaceError> {
// Iterate through key ranges, greedily packing them into partitions
// This function is responsible for appending keys in order, using implicit
// knowledge of how keys are defined.
let mut result = KeySpaceAccum::new();
// The dbdir metadata always exists
result.add_key(DBDIR_KEY);
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;
let mut metadata_keys = Vec::new();
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
dbs.sort_unstable();
for (spcnode, dbnode) in dbs {
result.add_key(relmap_file_key(spcnode, dbnode));
result.add_key(rel_dir_to_key(spcnode, dbnode));
metadata_keys.push(relmap_file_key(spcnode, dbnode));
metadata_keys.push(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, lsn, ctx)
@@ -689,7 +690,7 @@ impl Timeline {
let relsize = buf.get_u32_le();
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
result.add_key(relsize_key);
metadata_keys.push(relsize_key);
}
}
@@ -732,6 +733,13 @@ impl Timeline {
if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
result.add_key(AUX_FILES_KEY);
}
// The dbdir metadata always exists
result.add_key(DBDIR_KEY);
for key in metadata_keys {
result.add_key(key);
}
Ok(result.to_keyspace())
}
@@ -1474,21 +1482,11 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
//
// Below is a full list of the keyspace allocation:
//
// DbDir:
// 00 00000000 00000000 00000000 00 00000000
//
// Filenodemap:
// 00 SPCNODE DBNODE 00000000 00 00000000
//
// RelDir:
// 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
//
// RelBlock:
// 00 SPCNODE DBNODE RELNODE FORK BLKNUM
//
// RelSize:
// 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
//
// SlruDir:
// 01 kind 00000000 00000000 00 00000000
//
@@ -1513,11 +1511,31 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// AuxFiles:
// 03 00000000 00000000 00000000 00 00000002
//
// DbDir:
// 04 00000000 00000000 00000000 00 00000000
//
// Filenodemap:
// 04 SPCNODE DBNODE 00000000 00 00000000
//
// RelDir:
// 04 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
//
// RelSize:
// 04 SPCNODE DBNODE RELNODE FORK FFFFFFFF
//-- Section 01: relation data and metadata
/// Keys above this Key are required to serve a basebackup request
pub(crate) const BASEBACKUP_CUT: Key = slru_dir_to_key(SlruKind::Clog);
/// Keys aboe this Key are needed to make a logical size calculation
///
/// Ensuring that such keys are stored above the main range of user relation
/// blocks enables much more efficient space management.
pub(crate) const METADATA_CUT: Key = CONTROLFILE_KEY;
const DBDIR_KEY: Key = Key {
field1: 0x00,
field1: 0x04,
field2: 0,
field3: 0,
field4: 0,
@@ -1527,14 +1545,14 @@ const DBDIR_KEY: Key = Key {
fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
Key {
field1: 0x00,
field1: 0x04,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: 0x00,
field1: 0x04,
field2: spcnode,
field3: dbnode,
field4: 0xffffffff,
@@ -1545,7 +1563,7 @@ fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
Key {
field1: 0x00,
field1: 0x04,
field2: spcnode,
field3: dbnode,
field4: 0,
@@ -1556,7 +1574,7 @@ fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
Key {
field1: 0x00,
field1: 0x04,
field2: spcnode,
field3: dbnode,
field4: 0,
@@ -1578,7 +1596,7 @@ fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
fn rel_size_to_key(rel: RelTag) -> Key {
Key {
field1: 0x00,
field1: 0x04,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
@@ -1607,7 +1625,7 @@ fn rel_key_range(rel: RelTag) -> Range<Key> {
//-- Section 02: SLRUs
fn slru_dir_to_key(kind: SlruKind) -> Key {
const fn slru_dir_to_key(kind: SlruKind) -> Key {
Key {
field1: 0x01,
field2: match kind {

View File

@@ -61,7 +61,6 @@ use self::mgr::TenantsMap;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::TimelineResources;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
@@ -252,8 +251,6 @@ pub struct Tenant {
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
// Cancellation token fires when we have entered shutdown(). This is a parent of
@@ -2367,7 +2364,6 @@ impl Tenant {
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
gate: Gate::new(format!("Tenant<{tenant_id}>")),

View File

@@ -647,52 +647,20 @@ impl RemoteTimelineClient {
/// deletion won't actually be performed, until all previously scheduled
/// upload operations, and the index file upload, have completed
/// successfully.
///
/// No work is done if the layers are not present in the remote index. Returns
/// false if no work was done.
pub fn schedule_layer_file_deletion(
self: &Arc<Self>,
names: &[LayerFileName],
) -> anyhow::Result<bool> {
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let with_generations =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
if with_generations.is_empty() {
// No-op.
Ok(false)
} else {
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
// Launch the tasks immediately, if possible
self.launch_queued_tasks(upload_queue);
Ok(true)
}
}
/// Schedule layer deletions and wait for them to fully execute.
///
/// This is not the normal way to delete layers: usually deletion is scheduled and
/// left to run in the background. However, during startup in [`crate::tenant::Timeline::load_layer_map`]
/// we may find that there are some layers in the future wrt disk_consistent_lsn,
/// and drop them. This is different to a normal deletion, because we are deleting layers that
/// we may soon re-upload with the same name: it's important that the deletions do not race with
/// those later uploads. So this function includes a full flush of the deletion queue.
///
/// TODO: remote, as we will no longer need this function when we are always running pageservers with
/// generations enabled, because layer keys after a restart will always differ to layers before
/// the restart by their generation suffix.
pub async fn flushing_delete_layers(
self: &Arc<Self>,
names: &[LayerFileName],
) -> anyhow::Result<()> {
if self.schedule_layer_file_deletion(names)? {
self.wait_completion().await?;
self.deletion_queue_client.flush_execute().await?;
}
// Launch the tasks immediately, if possible
self.launch_queued_tasks(upload_queue);
Ok(())
}

View File

@@ -5,7 +5,7 @@ use utils::{
lsn::Lsn,
};
use crate::repository::Key;
use crate::{pgdatadir_mapping::METADATA_CUT, repository::Key};
use super::{DeltaFileName, ImageFileName, LayerFileName};
@@ -49,6 +49,20 @@ impl PersistentLayerDesc {
}
}
/// Does this layer consist exclusively of metadata
/// content such as dbdir & relation sizes? This is a
/// hint that the layer is likely to be small and should
/// not be a candidate for eviction under normal circumstances.
pub fn is_metadata_pages(&self) -> bool {
self.key_range.start >= METADATA_CUT
}
/// Does this layer consist exclusively of content
/// required to serve a basebackup request?
pub fn is_basebackup_pages(&self) -> bool {
self.key_range.start >= METADATA_CUT
}
pub fn short_id(&self) -> impl Display {
self.filename()
}

View File

@@ -81,8 +81,6 @@ use crate::task_mgr::TaskKind;
use crate::ZERO_PAGE;
use self::delete::DeleteTimelineFlow;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
@@ -298,8 +296,6 @@ pub struct Timeline {
/// timeline is being deleted. If 'true', the timeline has already been deleted.
pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
/// Barrier to wait before doing initial logical size calculation. Used only during startup.
initial_logical_size_can_start: Option<completion::Barrier>,
@@ -433,7 +429,6 @@ impl std::fmt::Display for PageReconstructError {
pub enum LogicalSizeCalculationCause {
Initial,
ConsumptionMetricsSyntheticSize,
EvictionTaskImitation,
TenantSizeHandler,
}
@@ -1442,9 +1437,6 @@ impl Timeline {
state,
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
initial_logical_size_can_start,
@@ -1709,7 +1701,10 @@ impl Timeline {
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
if let Some(rtc) = self.remote_client.as_ref() {
rtc.flushing_delete_layers(&needs_cleanup).await?;
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
rtc.schedule_index_upload_for_file_changes()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
}
info!(
@@ -1964,9 +1959,6 @@ impl Timeline {
LogicalSizeCalculationCause::Initial
| LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
| LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
LogicalSizeCalculationCause::EvictionTaskImitation => {
&self.metrics.imitate_logical_size_histo
}
};
let timer = storage_time_metrics.start_timer();
let logical_size = self
@@ -2743,18 +2735,18 @@ impl Timeline {
partition_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
{
let partitioning_guard = self.partitioning.lock().unwrap();
let distance = lsn.0 - partitioning_guard.1 .0;
if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
debug!(
distance,
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
}
}
// {
// let partitioning_guard = self.partitioning.lock().unwrap();
// let distance = lsn.0 - partitioning_guard.1 .0;
// if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
// debug!(
// distance,
// threshold = self.repartition_threshold,
// "no repartitioning needed"
// );
// return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
// }
// }
let keyspace = self.collect_keyspace(lsn, ctx).await?;
let partitioning = keyspace.partition(partition_size);
@@ -4282,6 +4274,11 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
// Don't evict small layers required to serve a basebackup
if l.is_basebackup_pages() {
continue;
}
let l = guard.get_from_desc(&l);
let l = match l.keep_resident().await {

View File

@@ -14,7 +14,6 @@
//!
//! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
use std::{
collections::HashMap,
ops::ControlFlow,
sync::Arc,
time::{Duration, SystemTime},
@@ -22,17 +21,15 @@ use std::{
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use tracing::{debug, error, info, instrument, warn};
use crate::{
context::{DownloadBehavior, RequestContext},
pgdatadir_mapping::CollectKeySpaceError,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
tasks::{BackgroundLoopKind, RateLimitError},
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -40,16 +37,6 @@ use utils::completion;
use super::Timeline;
#[derive(Default)]
pub struct EvictionTaskTimelineState {
last_layer_access_imitation: Option<tokio::time::Instant>,
}
#[derive(Default)]
pub struct EvictionTaskTenantState {
last_layer_access_imitation: Option<Instant>,
}
impl Timeline {
pub(super) fn launch_eviction_task(
self: &Arc<Self>,
@@ -178,7 +165,6 @@ impl Timeline {
// that were accessed to compute the value in the first place.
// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
// get re-computed from layers, thereby counting towards layer access stats.
// 4. Make the eviction task imitate the layer accesses that typically hit caches.
//
// We follow approach (4) here because in Neon prod deployment:
// - page cache is quite small => high churn => low hit rate
@@ -190,10 +176,6 @@ impl Timeline {
//
// We should probably move to persistent caches in the future, or avoid
// having inactive tenants attached to pageserver in the first place.
match self.imitate_layer_accesses(p, cancel, ctx).await {
ControlFlow::Break(()) => return ControlFlow::Break(()),
ControlFlow::Continue(()) => (),
}
#[allow(dead_code)]
#[derive(Debug, Default)]
@@ -215,6 +197,11 @@ impl Timeline {
let layers = guard.layer_map();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
// Don't evict the small layers needed to serve a basebackup request.
if hist_layer.is_basebackup_pages() {
continue;
}
let hist_layer = guard.get_from_desc(&hist_layer);
// guard against eviction while we inspect it; it might be that eviction_task and
@@ -310,170 +297,4 @@ impl Timeline {
}
ControlFlow::Continue(())
}
#[instrument(skip_all)]
async fn imitate_layer_accesses(
&self,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<()> {
let mut state = self.eviction_task_timeline_state.lock().await;
// Only do the imitate_layer accesses approximately as often as the threshold. A little
// more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
_ => {
self.imitate_timeline_cached_layer_accesses(ctx).await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
}
}
drop(state);
if cancel.is_cancelled() {
return ControlFlow::Break(());
}
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
// imitated accesses that the winner made.
let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) {
Ok(t) => t,
Err(_) => {
return ControlFlow::Break(());
}
};
let mut state = tenant.eviction_task_tenant_state.lock().await;
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
_ => {
self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
.await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now());
}
}
drop(state);
if cancel.is_cancelled() {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
}
/// Recompute the values which would cause on-demand downloads during restart.
#[instrument(skip_all)]
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
let lsn = self.get_last_record_lsn();
// imitiate on-restart initial logical size
let size = self
.calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
.instrument(info_span!("calculate_logical_size"))
.await;
match &size {
Ok(_size) => {
// good, don't log it to avoid confusion
}
Err(_) => {
// we have known issues for which we already log this on consumption metrics,
// gc, and compaction. leave logging out for now.
//
// https://github.com/neondatabase/neon/issues/2539
}
}
// imitiate repartiting on first compactation
if let Err(e) = self
.collect_keyspace(lsn, ctx)
.instrument(info_span!("collect_keyspace"))
.await
{
// if this failed, we probably failed logical size because these use the same keys
if size.is_err() {
// ignore, see above comment
} else {
match e {
CollectKeySpaceError::Cancelled => {
// Shutting down, ignore
}
err => {
warn!(
"failed to collect keyspace but succeeded in calculating logical size: {err:#}"
);
}
}
}
}
}
// Imitate the synthetic size calculation done by the consumption_metrics module.
#[instrument(skip_all)]
async fn imitate_synthetic_size_calculation_worker(
&self,
tenant: &Arc<Tenant>,
ctx: &RequestContext,
cancel: &CancellationToken,
) {
if self.conf.metric_collection_endpoint.is_none() {
// We don't start the consumption metrics task if this is not set in the config.
// So, no need to imitate the accesses in that case.
return;
}
// The consumption metrics are collected on a per-tenant basis, by a single
// global background loop.
// It limits the number of synthetic size calculations using the global
// `concurrent_tenant_size_logical_size_queries` semaphore to not overload
// the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
//
// If we used that same semaphore here, then we'd compete for the
// same permits, which may impact timeliness of consumption metrics.
// That is a no-go, as consumption metrics are much more important
// than what we do here.
//
// So, we have a separate semaphore, initialized to the same
// number of permits as the `concurrent_tenant_size_logical_size_queries`.
// In the worst, we would have twice the amount of concurrenct size calculations.
// But in practice, the `p.threshold` >> `consumption metric interval`, and
// we spread out the eviction task using `random_init_delay`.
// So, the chance of the worst case is quite low in practice.
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
// So, we must coordinate with other with other eviction tasks of this tenant.
let limit = self
.conf
.eviction_task_immitated_concurrent_logical_size_queries
.inner();
let mut throwaway_cache = HashMap::new();
let gather = crate::tenant::size::gather_inputs(
tenant,
limit,
None,
&mut throwaway_cache,
LogicalSizeCalculationCause::EvictionTaskImitation,
ctx,
)
.instrument(info_span!("gather_inputs"));
tokio::select! {
_ = cancel.cancelled() => {}
gather_result = gather => {
match gather_result {
Ok(_) => {},
Err(e) => {
// We don't care about the result, but, if it failed, we should log it,
// since consumption metric might be hitting the cached value and
// thus not encountering this error.
warn!("failed to imitate synthetic size calculation accesses: {e:#}")
}
}
}
}
}
}

View File

@@ -1687,9 +1687,9 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
(errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because project size limit (%d MB) has been exceeded",
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
max_cluster_size),
errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC")));
errhint("This limit is defined by neon.max_cluster_size GUC")));
}
/*

View File

@@ -1,9 +1,7 @@
//! User credentials used in authentication.
use crate::{
auth::password_hack::parse_endpoint_param,
error::UserFacingError,
proxy::{neon_options, NUM_CONNECTION_ACCEPTED_BY_SNI},
auth::password_hack::parse_endpoint_param, error::UserFacingError, proxy::neon_options,
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -126,22 +124,6 @@ impl<'a> ClientCredentials<'a> {
.transpose()?;
info!(user, project = project.as_deref(), "credentials");
if sni.is_some() {
info!("Connection with sni");
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["sni"])
.inc();
} else if project.is_some() {
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["no_sni"])
.inc();
info!("Connection without sni");
} else {
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["password_hack"])
.inc();
info!("Connection with password hack");
}
let cache_key = format!(
"{}{}",

View File

@@ -248,7 +248,6 @@ impl ConnCfg {
// connect_raw() will not use TLS if sslmode is "disable"
let (client, connection) = self.0.connect_raw(stream, tls).await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let stream = connection.stream.into_inner();
info!(

View File

@@ -129,15 +129,6 @@ pub static RATE_LIMITER_LIMIT: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});
pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_by_sni",
"Number of connections (per sni).",
&["kind"],
)
.unwrap()
});
pub struct LatencyTimer {
// time since the stopwatch was started
start: Option<Instant>,
@@ -514,7 +505,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
timeout: time::Duration,

View File

@@ -208,13 +208,14 @@ impl GlobalConnPool {
} else {
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
latency_timer.pool_hit();
latency_timer.success();
return Ok(Client::new(client, pool).await);
return Ok(Client {
conn_id: client.conn_id,
inner: Some(client),
span: Span::current(),
pool,
});
}
} else {
let conn_id = uuid::Uuid::new_v4();
@@ -228,12 +229,6 @@ impl GlobalConnPool {
)
.await
};
if let Ok(client) = &new_client {
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
}
match &new_client {
// clear the hash. it's no longer valid
@@ -267,8 +262,13 @@ impl GlobalConnPool {
}
_ => {}
}
let new_client = new_client?;
Ok(Client::new(new_client, pool).await)
new_client.map(|inner| Client {
conn_id: inner.conn_id,
inner: Some(inner),
span: Span::current(),
pool,
})
}
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
@@ -394,7 +394,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
#[tracing::instrument(skip_all)]
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
@@ -461,7 +461,6 @@ async fn connect_to_compute_once(
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let (tx, mut rx) = tokio::sync::watch::channel(session);
@@ -548,17 +547,6 @@ pub struct Discard<'a> {
}
impl Client {
pub(self) async fn new(
inner: ClientInner,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
) -> Self {
Self {
conn_id: inner.conn_id,
inner: Some(inner),
span: Span::current(),
pool,
}
}
pub fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
let Self {
inner,

View File

@@ -250,7 +250,7 @@ pub async fn handle(
Ok(response)
}
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
#[instrument(name = "sql-over-http", skip_all)]
async fn handle_inner(
request: Request<Body>,
sni_hostname: Option<String>,

View File

@@ -1,126 +0,0 @@
# Supplemental file for neondatabase/autoscaling's vm-builder, for producing the VM compute image.
---
commands:
- name: cgconfigparser
user: root
sysvInitAction: sysinit
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
- name: pgbouncer
user: nobody
sysvInitAction: respawn
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
- filename: pgbouncer.ini
content: |
[databases]
*=host=localhost port=5432 auth_user=cloud_admin
[pgbouncer]
listen_port=6432
listen_addr=0.0.0.0
auth_type=scram-sha-256
auth_user=cloud_admin
auth_dbname=postgres
client_tls_sslmode=disable
server_tls_sslmode=disable
pool_mode=transaction
max_client_conn=10000
default_pool_size=16
max_prepared_statements=0
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
group neon-postgres {
perm {
admin {
uid = postgres;
}
task {
gid = users;
}
}
memory {}
}
build: |
# Build cgroup-tools
#
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-monitor
# requires cgroup v2, so we'll build cgroup-tools ourselves.
FROM debian:bullseye-slim as libcgroup-builder
ENV LIBCGROUP_VERSION v2.0.3
RUN set -exu \
&& apt update \
&& apt install --no-install-recommends -y \
git \
ca-certificates \
automake \
cmake \
make \
gcc \
byacc \
flex \
libtool \
libpam0g-dev \
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
&& INSTALL_DIR="/libcgroup-install" \
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
&& cd libcgroup \
# extracted from bootstrap.sh, with modified flags:
&& (test -d m4 || mkdir m4) \
&& autoreconf -fi \
&& rm -rf autom4te.cache \
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
# actually build the thing...
&& make install
FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.0 AS postgres-exporter
# Build pgbouncer
#
FROM debian:bullseye-slim AS pgbouncer
RUN set -e \
&& apt-get update \
&& apt-get install -y \
curl \
build-essential \
pkg-config \
libevent-dev \
libssl-dev
ENV PGBOUNCER_VERSION 1.21.0
ENV PGBOUNCER_GITPATH 1_21_0
RUN set -e \
&& curl -sfSL https://github.com/pgbouncer/pgbouncer/releases/download/pgbouncer_${PGBOUNCER_GITPATH}/pgbouncer-${PGBOUNCER_VERSION}.tar.gz -o pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
&& tar xzvf pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
&& cd pgbouncer-${PGBOUNCER_VERSION} \
&& LDFLAGS=-static ./configure --prefix=/usr/local/pgbouncer --without-openssl \
&& make -j $(nproc) \
&& make install
merge: |
# tweak nofile limits
RUN set -e \
&& echo 'fs.file-max = 1048576' >>/etc/sysctl.conf \
&& test ! -e /etc/security || ( \
echo '* - nofile 1048576' >>/etc/security/limits.conf \
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
)
COPY cgconfig.conf /etc/cgconfig.conf
COPY pgbouncer.ini /etc/pgbouncer.ini
RUN set -e \
&& chown postgres:postgres /etc/pgbouncer.ini \
&& chmod 0644 /etc/pgbouncer.ini \
&& chmod 0644 /etc/cgconfig.conf
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
COPY --from=postgres-exporter /bin/postgres_exporter /bin/postgres_exporter
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer