Merge remote-tracking branch 'origin/main' into HEAD

This commit is contained in:
Heikki Linnakangas
2025-07-12 16:43:57 +03:00
61 changed files with 1448 additions and 448 deletions

8
.gitmodules vendored
View File

@@ -1,16 +1,16 @@
[submodule "vendor/postgres-v14"]
path = vendor/postgres-v14
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_14_STABLE_neon
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_15_STABLE_neon
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_16_STABLE_neon
[submodule "vendor/postgres-v17"]
path = vendor/postgres-v17
url = https://github.com/neondatabase/postgres.git
url = ../postgres.git
branch = REL_17_STABLE_neon

3
Cargo.lock generated
View File

@@ -5440,6 +5440,7 @@ dependencies = [
"async-trait",
"atomic-take",
"aws-config",
"aws-credential-types",
"aws-sdk-iam",
"aws-sigv4",
"base64 0.22.1",
@@ -5479,6 +5480,7 @@ dependencies = [
"itoa",
"jose-jwa",
"jose-jwk",
"json",
"lasso",
"measured",
"metrics",
@@ -7196,6 +7198,7 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"reqwest",
"safekeeper_api",
"serde_json",
"storage_controller_client",
"tokio",

View File

@@ -1067,6 +1067,8 @@ impl ComputeNode {
self.try_get_basebackup_libpq(spec, lsn)?
};
self.fix_zenith_signal_neon_signal()?;
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros =
connected.duration_since(started).as_micros() as u64;
@@ -1076,6 +1078,27 @@ impl ComputeNode {
Ok(())
}
/// Move the Zenith signal file to Neon signal file location.
/// This makes Compute compatible with older PageServers that don't yet
/// know about the Zenith->Neon rename.
fn fix_zenith_signal_neon_signal(&self) -> Result<()> {
let datadir = Path::new(&self.params.pgdata);
let neonsig = datadir.join("neon.signal");
if neonsig.is_file() {
return Ok(());
}
let zenithsig = datadir.join("zenith.signal");
if zenithsig.is_file() {
fs::copy(zenithsig, neonsig)?;
}
Ok(())
}
/// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when
/// the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {

View File

@@ -36,7 +36,7 @@ impl StorageBroker {
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
print!("Starting neon broker at {}", broker.client_url());
println!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();

View File

@@ -32,7 +32,8 @@
//! config.json - passed to `compute_ctl`
//! pgdata/
//! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
//! zenith.signal
//! neon.signal
//! zenith.signal - copy of neon.signal, for backward compatibility
//! <other PostgreSQL files>
//! ```
//!

View File

@@ -217,6 +217,9 @@ pub struct NeonStorageControllerConf {
pub posthog_config: Option<PostHogConfig>,
pub kick_secondary_downloads: Option<bool>,
#[serde(with = "humantime_serde")]
pub shard_split_request_timeout: Option<Duration>,
}
impl NeonStorageControllerConf {
@@ -250,6 +253,7 @@ impl Default for NeonStorageControllerConf {
timeline_safekeeper_count: None,
posthog_config: None,
kick_secondary_downloads: None,
shard_split_request_timeout: None,
}
}
}

View File

@@ -303,7 +303,7 @@ impl PageServerNode {
async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
println!(
"Starting pageserver node {} at '{}' in {:?}, retrying for {:?}",
self.conf.id,
self.pg_connection_config.raw_address(),

View File

@@ -127,7 +127,7 @@ impl SafekeeperNode {
extra_opts: &[String],
retry_timeout: &Duration,
) -> anyhow::Result<()> {
print!(
println!(
"Starting safekeeper at '{}' in '{}', retrying for {:?}",
self.pg_connection_config.raw_address(),
self.datadir_path().display(),

View File

@@ -648,6 +648,13 @@ impl StorageController {
args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
}
if let Some(duration) = self.config.shard_split_request_timeout {
args.push(format!(
"--shard-split-request-timeout={}",
humantime::Duration::from(duration)
));
}
let mut envs = vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
@@ -660,7 +667,7 @@ impl StorageController {
));
}
println!("Starting storage controller");
println!("Starting storage controller at {scheme}://{host}:{listen_port}");
background_process::start_process(
COMMAND,

View File

@@ -14,6 +14,7 @@ humantime.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
reqwest.workspace = true
safekeeper_api.workspace=true
serde_json = { workspace = true, features = ["raw_value"] }
storage_controller_client.workspace = true
tokio.workspace = true

View File

@@ -11,7 +11,7 @@ use pageserver_api::controller_api::{
PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest,
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
@@ -21,6 +21,7 @@ use pageserver_api::models::{
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Certificate, Method, StatusCode, Url};
use safekeeper_api::models::TimelineLocateResponse;
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -279,6 +280,23 @@ enum Command {
#[arg(long)]
concurrency: Option<usize>,
},
/// Locate safekeepers for a timeline from the storcon DB.
TimelineLocate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
},
/// Migrate a timeline to a new set of safekeepers
TimelineSafekeeperMigrate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
/// Example: --new-sk-set 1,2,3
#[arg(long, required = true, value_delimiter = ',')]
new_sk_set: Vec<NodeId>,
},
}
#[derive(Parser)]
@@ -1324,7 +1342,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let mut path = format!(
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
"v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
);
if let Some(c) = concurrency {
@@ -1335,6 +1353,41 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::POST, path, None)
.await?;
}
Command::TimelineLocate {
tenant_id,
timeline_id,
} => {
let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate");
let resp = storcon_client
.dispatch::<(), TimelineLocateResponse>(Method::GET, path, None)
.await?;
let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_sk_set = resp
.new_sk_set
.as_ref()
.map(|ids| ids.iter().map(|id| id.0 as i64).collect::<Vec<_>>());
println!("generation = {}", resp.generation);
println!("sk_set = {sk_set:?}");
println!("new_sk_set = {new_sk_set:?}");
}
Command::TimelineSafekeeperMigrate {
tenant_id,
timeline_id,
new_sk_set,
} => {
let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate");
storcon_client
.dispatch::<_, ()>(
Method::POST,
path,
Some(TimelineSafekeeperMigrateRequest { new_sk_set }),
)
.await?;
}
}
Ok(())

View File

@@ -129,9 +129,10 @@ segment to bootstrap the WAL writing, but it doesn't contain the checkpoint reco
changes in xlog.c, to allow starting the compute node without reading the last checkpoint record
from WAL.
This includes code to read the `zenith.signal` file, which tells the startup code the LSN to start
at. When the `zenith.signal` file is present, the startup uses that LSN instead of the last
checkpoint's LSN. The system is known to be consistent at that LSN, without any WAL redo.
This includes code to read the `neon.signal` (also `zenith.signal`) file, which tells the startup
code the LSN to start at. When the `neon.signal` file is present, the startup uses that LSN
instead of the last checkpoint's LSN. The system is known to be consistent at that LSN, without
any WAL redo.
### How to get rid of the patch

View File

@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig, TimelineInfo};
use crate::shard::{ShardStripeSize, TenantShardId};
#[derive(Serialize, Deserialize, Debug)]
@@ -126,6 +126,13 @@ pub struct TenantDescribeResponse {
pub config: TenantConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantTimelineDescribeResponse {
pub shards: Vec<TimelineInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_consistent_lsn: Option<Lsn>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShardResponse {
pub node_id: NodeId,

View File

@@ -1622,6 +1622,9 @@ pub struct TimelineInfo {
/// Whether the timeline is invisible in synthetic size calculations.
pub is_invisible: Option<bool>,
// HADRON: the largest LSN below which all page updates have been included in the image layers.
#[serde(skip_serializing_if = "Option::is_none")]
pub image_consistent_lsn: Option<Lsn>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -31,6 +31,7 @@ pub struct UnreliableWrapper {
/* BEGIN_HADRON */
// This the probability of failure for each operation, ranged from [0, 100].
// The probability is default to 100, which means that all operations will fail.
// Storage will fail by probability up to attempts_to_fail times.
attempt_failure_probability: u64,
/* END_HADRON */
}

View File

@@ -11,7 +11,7 @@ use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use crate::membership::Configuration;
use crate::membership::{Configuration, SafekeeperGeneration};
use crate::{ServerInfo, Term};
#[derive(Debug, Serialize, Deserialize)]
@@ -311,3 +311,12 @@ pub struct PullTimelineResponse {
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}
/// Response to a timeline locate request.
/// Storcon-only API.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TimelineLocateResponse {
pub generation: SafekeeperGeneration,
pub sk_set: Vec<NodeId>,
pub new_sk_set: Option<Vec<NodeId>>,
}

View File

@@ -47,6 +47,7 @@ where
/* BEGIN_HADRON */
pub enum DeploymentMode {
Local,
Dev,
Staging,
Prod,
@@ -64,7 +65,7 @@ pub fn get_deployment_mode() -> Option<DeploymentMode> {
}
},
Err(_) => {
tracing::error!("DEPLOYMENT_MODE not set");
// tracing::error!("DEPLOYMENT_MODE not set");
None
}
}

View File

@@ -114,7 +114,7 @@ where
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
// "zenith.signal" file, so that postgres can read it during startup.
// "neon.signal" file, so that postgres can read it during startup.
//
// We don't keep full history of record boundaries in the page server,
// however, only the predecessor of the latest record on each
@@ -751,34 +751,39 @@ where
//
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
// Also send neon.signal and zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(
&mut self,
pg_control_bytes: Bytes,
system_identifier: u64,
) -> Result<(), BasebackupError> {
// add zenith.signal file
let mut zenith_signal = String::new();
// add neon.signal file
let mut neon_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.timeline.is_ancestor_lsn(self.lsn) {
write!(zenith_signal, "PREV LSN: none")
write!(neon_signal, "PREV LSN: none")
.map_err(|e| BasebackupError::Server(e.into()))?;
} else {
write!(zenith_signal, "PREV LSN: invalid")
write!(neon_signal, "PREV LSN: invalid")
.map_err(|e| BasebackupError::Server(e.into()))?;
}
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)
write!(neon_signal, "PREV LSN: {}", self.prev_record_lsn)
.map_err(|e| BasebackupError::Server(e.into()))?;
}
self.ar
.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
// TODO: Remove zenith.signal once all historical computes have been replaced
// ... and thus support the neon.signal file.
for signalfilename in ["neon.signal", "zenith.signal"] {
self.ar
.append(
&new_tar_header(signalfilename, neon_signal.len() as u64)?,
neon_signal.as_bytes(),
)
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,neon.signal"))?;
}
//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;

View File

@@ -917,11 +917,6 @@ async fn create_remote_storage_client(
// If `test_remote_failures` is non-zero, wrap the client with a
// wrapper that simulates failures.
if conf.test_remote_failures > 0 {
if !cfg!(feature = "testing") {
anyhow::bail!(
"test_remote_failures option is not available because pageserver was compiled without the 'testing' feature"
);
}
info!(
"Simulating remote failures for first {} attempts of each op",
conf.test_remote_failures

View File

@@ -397,6 +397,7 @@ async fn build_timeline_info(
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
force_await_initial_logical_size: bool,
include_image_consistent_lsn: bool,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -421,6 +422,10 @@ async fn build_timeline_info(
.await?,
);
}
// HADRON
if include_image_consistent_lsn {
info.image_consistent_lsn = Some(timeline.compute_image_consistent_lsn().await?);
}
Ok(info)
}
@@ -510,6 +515,8 @@ async fn build_timeline_info_common(
is_invisible: Some(is_invisible),
walreceiver_status,
// HADRON
image_consistent_lsn: None,
};
Ok(info)
}
@@ -712,6 +719,8 @@ async fn timeline_list_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
@@ -732,6 +741,7 @@ async fn timeline_list_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
@@ -760,6 +770,9 @@ async fn timeline_and_offloaded_list_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
@@ -780,6 +793,7 @@ async fn timeline_and_offloaded_list_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
@@ -964,6 +978,9 @@ async fn timeline_detail_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
// HADRON
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
// Logical size calculation needs downloading.
@@ -984,6 +1001,7 @@ async fn timeline_detail_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
ctx,
)
.await
@@ -3643,6 +3661,7 @@ async fn activate_post_import_handler(
let timeline_info = build_timeline_info(
&timeline, false, // include_non_incremental_logical_size,
false, // force_await_initial_logical_size
false, // include_image_consistent_lsn
&ctx,
)
.await
@@ -4164,7 +4183,7 @@ pub fn make_router(
})
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
|r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
|r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage",

View File

@@ -610,13 +610,13 @@ async fn import_file(
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");
} else if file_path.starts_with("zenith.signal") {
} else if file_path.starts_with("zenith.signal") || file_path.starts_with("neon.signal") {
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader).await?;
// zenith.signal format is "PREV LSN: prev_lsn"
// neon.signal format is "PREV LSN: prev_lsn"
// TODO write serialization and deserialization in the same place.
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
let prev_lsn = match zenith_signal {
let neon_signal = std::str::from_utf8(&bytes)?.trim();
let prev_lsn = match neon_signal {
"PREV LSN: none" => Lsn(0),
"PREV LSN: invalid" => Lsn(0),
other => {
@@ -624,17 +624,17 @@ async fn import_file(
split[1]
.trim()
.parse::<Lsn>()
.context("can't parse zenith.signal")?
.context("can't parse neon.signal")?
}
};
// zenith.signal is not necessarily the last file, that we handle
// neon.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);
debug!("imported neon signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
// TODO Backups exported from neon won't have pg_tblspc, but we will need
// this to import arbitrary postgres databases.

View File

@@ -12816,6 +12816,40 @@ mod tests {
},
]
);
Ok(())
}
#[tokio::test]
async fn test_get_force_image_creation_lsn() -> anyhow::Result<()> {
let tenant_conf = pageserver_api::models::TenantConfig {
pitr_interval: Some(Duration::from_secs(7 * 3600)),
image_layer_force_creation_period: Some(Duration::from_secs(3600)),
..Default::default()
};
let tenant_id = TenantId::generate();
let harness = TenantHarness::create_custom(
"test_get_force_image_creation_lsn",
tenant_conf,
tenant_id,
ShardIdentity::unsharded(),
Generation::new(1),
)
.await?;
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
timeline.gc_info.write().unwrap().cutoffs.time = Some(Lsn(100));
{
let writer = timeline.writer().await;
writer.finish_write(Lsn(5000));
}
let image_creation_lsn = timeline.get_force_image_creation_lsn().unwrap();
assert_eq!(image_creation_lsn, Lsn(4300));
Ok(())
}
}

View File

@@ -46,10 +46,11 @@
mod historic_layer_coverage;
mod layer_coverage;
use std::collections::{HashMap, VecDeque};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use anyhow::Result;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
@@ -904,6 +905,103 @@ impl LayerMap {
max_stacked_deltas
}
/* BEGIN_HADRON */
/**
* Compute the image consistent LSN, the largest LSN below which all pages have been redone successfully.
* It works by first finding the latest image layers and store them into a map. Then for each delta layer,
* find all overlapping image layers in order to potentially increase the image LSN in case there are gaps
* (e.g., if an image is created at LSN 100 but the delta layer spans LSN [150, 200], then we can increase
* image LSN to 150 because there is no WAL record in between).
* Finally, the image consistent LSN is computed by taking the minimum of all image layers.
*/
pub fn compute_image_consistent_lsn(&self, disk_consistent_lsn: Lsn) -> Lsn {
struct ImageLayerInfo {
// creation LSN of the image layer
image_lsn: Lsn,
// the current minimum LSN of newer delta layers with overlapping key ranges
min_delta_lsn: Lsn,
}
let started_at = Instant::now();
let min_l0_deltas_lsn = {
let l0_deltas = self.level0_deltas();
l0_deltas
.iter()
.map(|layer| layer.get_lsn_range().start)
.min()
.unwrap_or(disk_consistent_lsn)
};
let global_key_range = Key::MIN..Key::MAX;
// step 1: collect all most recent image layers into a map
// map: end key to image_layer_info
let mut image_map: BTreeMap<Key, ImageLayerInfo> = BTreeMap::new();
for (img_range, img) in self.image_coverage(&global_key_range, disk_consistent_lsn) {
let img_lsn = img.map(|layer| layer.get_lsn_range().end).unwrap_or(Lsn(0));
image_map.insert(
img_range.end,
ImageLayerInfo {
image_lsn: img_lsn,
min_delta_lsn: min_l0_deltas_lsn,
},
);
}
// step 2: go through all delta layers, and update the image layer info with overlapping
// key ranges
for layer in self.historic.iter() {
if !layer.is_delta {
continue;
}
let delta_key_range = layer.get_key_range();
let delta_lsn_range = layer.get_lsn_range();
for (img_end_key, img_info) in image_map.range_mut(delta_key_range.start..Key::MAX) {
debug_assert!(img_end_key >= &delta_key_range.start);
if delta_lsn_range.end > img_info.image_lsn {
// the delta layer includes WAL records after the image
// it's possibel that the delta layer's start LSN < image LSN, which will be simply ignored by step 3
img_info.min_delta_lsn =
std::cmp::min(img_info.min_delta_lsn, delta_lsn_range.start);
}
if img_end_key >= &delta_key_range.end {
// we have fully processed all overlapping image layers
break;
}
}
}
// step 3, go through all image layers and find the image consistent LSN
let mut img_consistent_lsn = min_l0_deltas_lsn.checked_sub(Lsn(1)).unwrap();
let mut prev_key = Key::MIN;
for (img_key, img_info) in image_map {
tracing::debug!(
"Image layer {:?}:{} has min delta lsn {}",
Range {
start: prev_key,
end: img_key,
},
img_info.image_lsn,
img_info.min_delta_lsn,
);
let image_lsn = std::cmp::max(
img_info.image_lsn,
img_info.min_delta_lsn.checked_sub(Lsn(1)).unwrap_or(Lsn(0)),
);
img_consistent_lsn = std::cmp::min(img_consistent_lsn, image_lsn);
prev_key = img_key;
}
tracing::info!(
"computed image_consistent_lsn {} for disk_consistent_lsn {} in {}ms. Processed {} layrs in total.",
img_consistent_lsn,
disk_consistent_lsn,
started_at.elapsed().as_millis(),
self.historic.len()
);
img_consistent_lsn
}
/* END_HADRON */
/// Return all L0 delta layers
pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
&self.l0_delta_layers
@@ -1579,6 +1677,138 @@ mod tests {
LayerVisibilityHint::Visible
));
}
/* BEGIN_HADRON */
#[test]
fn test_compute_image_consistent_lsn() {
let mut layer_map = LayerMap::default();
let disk_consistent_lsn = Lsn(1000);
// case 1: empty layer map
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(
disk_consistent_lsn.checked_sub(Lsn(1)).unwrap(),
image_consistent_lsn
);
// case 2: only L0 delta layer
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(100),
Lsn(900)..Lsn(990),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(100),
Lsn(850)..Lsn(899),
true,
));
}
// should use min L0 delta LSN - 1 as image consistent LSN
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(849), image_consistent_lsn);
// case 3: 3 images, no L1 delta
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(40),
Lsn(100)..Lsn(100),
false,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(40)..Key::from_i128(70),
Lsn(200)..Lsn(200),
false,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(70)..Key::from_i128(100),
Lsn(150)..Lsn(150),
false,
));
}
// should use min L0 delta LSN - 1 as image consistent LSN
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(849), image_consistent_lsn);
// case 4: 3 images with 1 L1 delta
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(50),
Lsn(300)..Lsn(350),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(299), image_consistent_lsn);
// case 5: 3 images with 1 more L1 delta with smaller LSN
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(50)..Key::from_i128(72),
Lsn(200)..Lsn(300),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 6: 3 images with more newer L1 deltas (no impact on final results)
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(30),
Lsn(400)..Lsn(500),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(35)..Key::from_i128(100),
Lsn(450)..Lsn(600),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 7: 3 images with more older L1 deltas (no impact on final results)
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(40),
Lsn(0)..Lsn(50),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(50)..Key::from_i128(100),
Lsn(10)..Lsn(60),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 8: 3 images with one more L1 delta with overlapping LSN range
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(50),
Lsn(50)..Lsn(250),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(100), image_consistent_lsn);
}
/* END_HADRON */
}
#[cfg(test)]

View File

@@ -1678,6 +1678,8 @@ impl TenantManager {
// Phase 6: Release the InProgress on the parent shard
drop(parent_slot_guard);
utils::pausable_failpoint!("shard-split-post-finish-pause");
Ok(child_shards)
}

View File

@@ -351,13 +351,6 @@ pub struct Timeline {
last_image_layer_creation_check_at: AtomicLsn,
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
// HADRON
/// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation
/// on this key range.
force_image_creation_lsn: AtomicLsn,
/// The last time instant when force_image_creation_lsn is computed.
force_image_creation_lsn_computed_at: std::sync::Mutex<Option<Instant>>,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
@@ -2854,7 +2847,7 @@ impl Timeline {
}
// HADRON
fn get_image_creation_timeout(&self) -> Option<Duration> {
fn get_image_layer_force_creation_period(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
@@ -3134,9 +3127,6 @@ impl Timeline {
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
last_image_layer_creation_check_instant: Mutex::new(None),
// HADRON
force_image_creation_lsn: AtomicLsn::new(0),
force_image_creation_lsn_computed_at: std::sync::Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_latest_cache: RwLock::new(HashMap::new()),
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
@@ -5381,13 +5371,16 @@ impl Timeline {
}
// HADRON
// for child timelines, we consider all pages up to ancestor_LSN are redone successfully by the parent timeline
min_image_lsn = min_image_lsn.max(self.get_ancestor_lsn());
if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 {
info!(
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}",
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}, num deltas: {}",
partition.ranges[0].start,
partition.ranges[0].end,
min_image_lsn,
force_image_creation_lsn.unwrap()
force_image_creation_lsn.unwrap(),
max_deltas
);
return true;
}
@@ -5611,10 +5604,11 @@ impl Timeline {
/// Predicate function which indicates whether we should check if new image layers
/// are required. Since checking if new image layers are required is expensive in
/// terms of CPU, we only do it in the following cases:
/// 1. If the timeline has ingested sufficient WAL to justify the cost
/// 1. If the timeline has ingested sufficient WAL to justify the cost or ...
/// 2. If enough time has passed since the last check:
/// 1. For large tenants, we wish to perform the check more often since they
/// suffer from the lack of image layers
/// suffer from the lack of image layers. Note that we assume sharded tenants
/// to be large since non-zero shards do not track the logical size.
/// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold;
@@ -5628,30 +5622,39 @@ impl Timeline {
let distance_based_decision = distance.0 >= min_distance;
let mut time_based_decision = false;
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
let check_required_after =
if Some(Into::<u64>::into(&logical_size)) >= large_timeline_threshold {
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
time_based_decision = match *last_check_instant {
Some(last_check) => {
let elapsed = last_check.elapsed();
elapsed >= check_required_after
let check_required_after = (|| {
if self.shard_identity.is_unsharded() {
if let CurrentLogicalSize::Exact(logical_size) =
self.current_logical_size.current_size()
{
if Some(Into::<u64>::into(&logical_size)) < large_timeline_threshold {
return Duration::from_secs(3600 * 48);
}
}
None => true,
};
}
}
self.get_checkpoint_timeout()
})();
let time_based_decision = match *last_check_instant {
Some(last_check) => {
let elapsed = last_check.elapsed();
elapsed >= check_required_after
}
None => true,
};
// Do the expensive delta layer counting only if this timeline has ingested sufficient
// WAL since the last check or a checkpoint timeout interval has elapsed since the last
// check.
let decision = distance_based_decision || time_based_decision;
tracing::info!(
"Decided to check image layers: {}. Distance-based decision: {}, time-based decision: {}",
decision,
distance_based_decision,
time_based_decision
);
if decision {
self.last_image_layer_creation_check_at.store(lsn);
*last_check_instant = Some(Instant::now());
@@ -7153,6 +7156,19 @@ impl Timeline {
.unwrap()
.clone()
}
/* BEGIN_HADRON */
pub(crate) async fn compute_image_consistent_lsn(&self) -> anyhow::Result<Lsn> {
let guard = self
.layers
.read(LayerManagerLockHolder::ComputeImageConsistentLsn)
.await;
let layer_map = guard.layer_map()?;
let disk_consistent_lsn = self.get_disk_consistent_lsn();
Ok(layer_map.compute_image_consistent_lsn(disk_consistent_lsn))
}
/* END_HADRON */
}
impl Timeline {

View File

@@ -8,7 +8,7 @@ use std::cmp::min;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, Instant};
use super::layer_manager::LayerManagerLockHolder;
use super::{
@@ -34,7 +34,6 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
use pageserver_compaction::interface::*;
use postgres_ffi::to_pg_timestamp;
use serde::Serialize;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::CancellationToken;
@@ -47,7 +46,6 @@ use wal_decoder::models::value::Value;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
@@ -1271,10 +1269,7 @@ impl Timeline {
// Define partitioning schema if needed
// HADRON
let force_image_creation_lsn = self
.get_or_compute_force_image_creation_lsn(cancel, ctx)
.await
.map_err(CompactionError::Other)?;
let force_image_creation_lsn = self.get_force_image_creation_lsn();
// 1. L0 Compact
let l0_outcome = {
@@ -1484,59 +1479,37 @@ impl Timeline {
}
/* BEGIN_HADRON */
// Get the force image creation LSN. Compute it if the last computed LSN is too old.
async fn get_or_compute_force_image_creation_lsn(
self: &Arc<Self>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Option<Lsn>> {
const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
let image_layer_force_creation_period = self.get_image_creation_timeout();
if image_layer_force_creation_period.is_none() {
return Ok(None);
// Get the force image creation LSN based on gc_cutoff_lsn.
// Note that this is an estimation and the workload rate may suddenly change. When that happens,
// the force image creation may be too early or too late, but eventually it should be able to catch up.
pub(crate) fn get_force_image_creation_lsn(self: &Arc<Self>) -> Option<Lsn> {
let image_creation_period = self.get_image_layer_force_creation_period()?;
let current_lsn = self.get_last_record_lsn();
let pitr_lsn = self.gc_info.read().unwrap().cutoffs.time?;
let pitr_interval = self.get_pitr_interval();
if pitr_lsn == Lsn::INVALID || pitr_interval.is_zero() {
tracing::warn!(
"pitr LSN/interval not found, skipping force image creation LSN calculation"
);
return None;
}
let image_layer_force_creation_period = image_layer_force_creation_period.unwrap();
let force_image_creation_lsn_computed_at =
*self.force_image_creation_lsn_computed_at.lock().unwrap();
if force_image_creation_lsn_computed_at.is_none()
|| force_image_creation_lsn_computed_at.unwrap().elapsed()
> FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL
{
let now: SystemTime = SystemTime::now();
let timestamp = now
.checked_sub(image_layer_force_creation_period)
.ok_or_else(|| {
anyhow::anyhow!(
"image creation timeout is too large: {image_layer_force_creation_period:?}"
)
})?;
let timestamp = to_pg_timestamp(timestamp);
let force_image_creation_lsn = match self
.find_lsn_for_timestamp(timestamp, cancel, ctx)
.await?
{
LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn,
_ => {
let gc_lsn = *self.get_applied_gc_cutoff_lsn();
tracing::info!(
"no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}",
gc_lsn
);
gc_lsn
}
};
self.force_image_creation_lsn
.store(force_image_creation_lsn);
*self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now());
tracing::info!(
"computed force image creation LSN: {}",
force_image_creation_lsn
);
Ok(Some(force_image_creation_lsn))
} else {
Ok(Some(self.force_image_creation_lsn.load()))
}
let delta_lsn = current_lsn.checked_sub(pitr_lsn).unwrap().0
* image_creation_period.as_secs()
/ pitr_interval.as_secs();
let force_image_creation_lsn = current_lsn.checked_sub(delta_lsn).unwrap_or(Lsn(0));
tracing::info!(
"Tenant shard {} computed force_image_creation_lsn: {}. Current lsn: {}, image_layer_force_creation_period: {:?}, GC cutoff: {}, PITR interval: {:?}",
self.tenant_shard_id,
force_image_creation_lsn,
current_lsn,
image_creation_period,
pitr_lsn,
pitr_interval
);
Some(force_image_creation_lsn)
}
/* END_HADRON */

View File

@@ -359,14 +359,14 @@ impl<T: Types> Cache<T> {
Err(e) => {
// Retry on tenant manager error to handle tenant split more gracefully
if attempt < GET_MAX_RETRIES {
tracing::warn!(
"Fail to resolve tenant shard in attempt {}: {:?}. Retrying...",
attempt,
e
);
tokio::time::sleep(RETRY_BACKOFF).await;
continue;
} else {
tracing::warn!(
"Failed to resolve tenant shard after {} attempts: {:?}",
GET_MAX_RETRIES,
e
);
return Err(e);
}
}

View File

@@ -47,6 +47,7 @@ pub(crate) enum LayerManagerLockHolder {
ImportPgData,
DetachAncestor,
Eviction,
ComputeImageConsistentLsn,
#[cfg(test)]
Testing,
}

View File

@@ -147,6 +147,16 @@ pub enum RedoAttemptType {
GcCompaction,
}
impl std::fmt::Display for RedoAttemptType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RedoAttemptType::ReadPage => write!(f, "read page"),
RedoAttemptType::LegacyCompaction => write!(f, "legacy compaction"),
RedoAttemptType::GcCompaction => write!(f, "gc compaction"),
}
}
}
///
/// Public interface of WAL redo manager
///
@@ -199,6 +209,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
redo_attempt_type,
)
.await
};
@@ -221,6 +232,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
max_retry_attempts,
redo_attempt_type,
)
.await
}
@@ -445,6 +457,7 @@ impl PostgresRedoManager {
wal_redo_timeout: Duration,
pg_version: PgMajorVersion,
max_retry_attempts: u32,
redo_attempt_type: RedoAttemptType,
) -> Result<Bytes, Error> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
@@ -485,17 +498,28 @@ impl PostgresRedoManager {
);
if let Err(e) = result.as_ref() {
error!(
"error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
base_img_lsn,
lsn,
n_attempts,
e,
);
macro_rules! message {
($level:tt) => {
$level!(
"error applying {} WAL records {}..{} ({} bytes) to key {} during {}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
key,
redo_attempt_type,
base_img_lsn,
lsn,
n_attempts,
e,
)
}
}
match redo_attempt_type {
RedoAttemptType::ReadPage => message!(error),
RedoAttemptType::LegacyCompaction => message!(error),
RedoAttemptType::GcCompaction => message!(warn),
}
}
result.map_err(Error::Other)

View File

@@ -162,8 +162,34 @@ typedef struct FileCacheControl
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
/*
* Estimation of working set size.
*
* This is not guarded by the lock. No locking is needed because all the
* writes to the "registers" are simple 64-bit stores, to update a
* timestamp. We assume that:
*
* - 64-bit stores are atomic. We could enforce that by using
* pg_atomic_uint64 instead of TimestampTz as the datatype in hll.h, but
* for now we just rely on it implicitly.
*
* - Even if they're not, and there is a race between two stores, it
* doesn't matter much which one wins because they're both updating the
* register with the current timestamp. Or you have a race between
* resetting the register and updating it, in which case it also doesn't
* matter much which one wins.
*
* - If they're not atomic, you might get an occasional "torn write" if
* you're really unlucky, but we tolerate that too. It just means that
* the estimate will be a little off, until the register is updated
* again.
*/
HyperLogLogState wss_estimation;
/* Prewarmer state */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
size_t n_prewarm_workers;
size_t n_prewarm_entries;
@@ -205,6 +231,8 @@ bool AmPrewarmWorker;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
/*
* Close LFC file if opened.
* All backends should close their LFC files once LFC is disabled.
@@ -1162,6 +1190,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/* Update working set size estimate for the blocks */
for (int i = 0; i < nblocks; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
@@ -1240,14 +1275,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (entry == NULL)
{
/* Pages are not cached */
@@ -1526,9 +1553,15 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.forkNum = forknum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/* Update working set size estimate for the blocks */
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
@@ -1546,19 +1579,13 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
if (lwlsn > lsn)
{
elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
elog(DEBUG1, "Skip LFC write for %u because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X",
blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn));
LWLockRelease(lfc_lock);
return false;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1673,9 +1700,15 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.forkNum = forkNum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/* Update working set size estimate for the blocks */
for (int i = 0; i < nblocks; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -1716,14 +1749,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
cv = &lfc_ctl->cv[hash % N_COND_VARS];
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
/* Approximate working set for the blocks assumed in this entry */
for (int i = 0; i < blocks_in_chunk; i++)
{
tag.blockNum = blkno + i;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
/*
@@ -2160,6 +2185,10 @@ local_cache_pages(PG_FUNCTION_ARGS)
}
/*
* Internal implementation of the approximate_working_set_size_seconds()
* function.
*/
int32
lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
{
@@ -2168,11 +2197,9 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
if (lfc_size_limit == 0)
return -1;
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
return dc;
}

View File

@@ -50,7 +50,8 @@ extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blk
extern FileCacheState* lfc_get_state(size_t max_entries);
extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset);
extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset);

View File

@@ -652,8 +652,8 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
int32 dc;
bool reset = PG_GETARG_BOOL(0);
int32 dc;
if (neon_enable_new_communicator)
dc = communicator_new_approximate_working_set_size_seconds(-1, reset);

View File

@@ -236,13 +236,13 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
bool save_neon_test_evict;
/*
* Temporarily set the zenith_test_evict GUC, so that when we pin and
* Temporarily set the neon_test_evict GUC, so that when we pin and
* unpin a buffer, the buffer is evicted. We use that hack to evict all
* buffers, as there is no explicit "evict this buffer" function in the
* buffer manager.
*/
save_neon_test_evict = zenith_test_evict;
zenith_test_evict = true;
save_neon_test_evict = neon_test_evict;
neon_test_evict = true;
PG_TRY();
{
/* Scan through all the buffers */
@@ -273,7 +273,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
/*
* Pin the buffer, and release it again. Because we have
* zenith_test_evict==true, this will evict the page from the
* neon_test_evict==true, this will evict the page from the
* buffer cache if no one else is holding a pin on it.
*/
if (isvalid)
@@ -286,7 +286,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS)
PG_FINALLY();
{
/* restore the GUC */
zenith_test_evict = save_neon_test_evict;
neon_test_evict = save_neon_test_evict;
}
PG_END_TRY();

View File

@@ -2953,17 +2953,17 @@ XmlTableBuilderData
YYLTYPE
YYSTYPE
YY_BUFFER_STATE
ZenithErrorResponse
ZenithExistsRequest
ZenithExistsResponse
ZenithGetPageRequest
ZenithGetPageResponse
ZenithMessage
ZenithMessageTag
ZenithNblocksRequest
ZenithNblocksResponse
ZenithRequest
ZenithResponse
NeonErrorResponse
NeonExistsRequest
NeonExistsResponse
NeonGetPageRequest
NeonGetPageResponse
NeonMessage
NeonMessageTag
NeonNblocksRequest
NeonNblocksResponse
NeonRequest
NeonResponse
_SPI_connection
_SPI_plan
__AssignProcessToJobObject

View File

@@ -16,6 +16,7 @@ async-compression.workspace = true
async-trait.workspace = true
atomic-take.workspace = true
aws-config.workspace = true
aws-credential-types.workspace = true
aws-sdk-iam.workspace = true
aws-sigv4.workspace = true
base64.workspace = true
@@ -48,6 +49,7 @@ indexmap = { workspace = true, features = ["serde"] }
ipnet.workspace = true
itertools.workspace = true
itoa.workspace = true
json = { path = "../libs/proxy/json" }
lasso = { workspace = true, features = ["multi-threaded"] }
measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true
@@ -127,4 +129,4 @@ rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"
tracing-test = "0.2"

View File

@@ -123,6 +123,11 @@ docker exec -it proxy-postgres psql -U postgres -c "CREATE TABLE neon_control_pl
docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPERUSER LOGIN PASSWORD 'password';"
```
If you want to test query cancellation, redis is also required:
```sh
docker run --detach --name proxy-redis --publish 6379:6379 redis:7.0
```
Let's create self-signed certificate by running:
```sh
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.local.neon.build"
@@ -130,7 +135,10 @@ openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key
Then we need to build proxy with 'testing' feature and run, e.g.:
```sh
RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key
RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- \
--auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' \
--redis-auth-type="plain" --redis-plain="redis://127.0.0.1:6379" \
-c server.crt -k server.key
```
Now from client you can start a new session:

View File

@@ -7,13 +7,17 @@ use std::pin::pin;
use std::sync::Mutex;
use scopeguard::ScopeGuard;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use crate::ext::LockExt;
type ProcResult<P> = Result<<P as QueueProcessing>::Res, <P as QueueProcessing>::Err>;
pub trait QueueProcessing: Send + 'static {
type Req: Send + 'static;
type Res: Send;
type Err: Send + Clone;
/// Get the desired batch size.
fn batch_size(&self, queue_size: usize) -> usize;
@@ -24,7 +28,18 @@ pub trait QueueProcessing: Send + 'static {
/// If this apply can error, it's expected that errors be forwarded to each Self::Res.
///
/// Batching does not need to happen atomically.
fn apply(&mut self, req: Vec<Self::Req>) -> impl Future<Output = Vec<Self::Res>> + Send;
fn apply(
&mut self,
req: Vec<Self::Req>,
) -> impl Future<Output = Result<Vec<Self::Res>, Self::Err>> + Send;
}
#[derive(thiserror::Error)]
pub enum BatchQueueError<E: Clone, C> {
#[error(transparent)]
Result(E),
#[error(transparent)]
Cancelled(C),
}
pub struct BatchQueue<P: QueueProcessing> {
@@ -34,7 +49,7 @@ pub struct BatchQueue<P: QueueProcessing> {
struct BatchJob<P: QueueProcessing> {
req: P::Req,
res: tokio::sync::oneshot::Sender<P::Res>,
res: tokio::sync::oneshot::Sender<Result<P::Res, P::Err>>,
}
impl<P: QueueProcessing> BatchQueue<P> {
@@ -55,11 +70,11 @@ impl<P: QueueProcessing> BatchQueue<P> {
&self,
req: P::Req,
cancelled: impl Future<Output = R>,
) -> Result<P::Res, R> {
) -> Result<P::Res, BatchQueueError<P::Err, R>> {
let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req);
let mut cancelled = pin!(cancelled);
let resp = loop {
let resp: Option<Result<P::Res, P::Err>> = loop {
// try become the leader, or try wait for success.
let mut processor = tokio::select! {
// try become leader.
@@ -72,7 +87,7 @@ impl<P: QueueProcessing> BatchQueue<P> {
if inner.queue.remove(&id).is_some() {
tracing::warn!("batched task cancelled before completion");
}
return Err(cancel);
return Err(BatchQueueError::Cancelled(cancel));
},
};
@@ -96,18 +111,30 @@ impl<P: QueueProcessing> BatchQueue<P> {
// good: we didn't get cancelled.
ScopeGuard::into_inner(cancel_safety);
if values.len() != resps.len() {
tracing::error!(
"batch: invalid response size, expected={}, got={}",
resps.len(),
values.len()
);
}
match values {
Ok(values) => {
if values.len() != resps.len() {
tracing::error!(
"batch: invalid response size, expected={}, got={}",
resps.len(),
values.len()
);
}
// send response values.
for (tx, value) in std::iter::zip(resps, values) {
if tx.send(value).is_err() {
// receiver hung up but that's fine.
// send response values.
for (tx, value) in std::iter::zip(resps, values) {
if tx.send(Ok(value)).is_err() {
// receiver hung up but that's fine.
}
}
}
Err(err) => {
for tx in resps {
if tx.send(Err(err.clone())).is_err() {
// receiver hung up but that's fine.
}
}
}
}
@@ -129,7 +156,8 @@ impl<P: QueueProcessing> BatchQueue<P> {
tracing::debug!(id, "batch: job completed");
Ok(resp.expect("no response found. batch processer should not panic"))
resp.expect("no response found. batch processer should not panic")
.map_err(BatchQueueError::Result)
}
}
@@ -139,8 +167,8 @@ struct BatchQueueInner<P: QueueProcessing> {
}
impl<P: QueueProcessing> BatchQueueInner<P> {
fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver<P::Res>) {
let (tx, rx) = tokio::sync::oneshot::channel();
fn register_job(&mut self, req: P::Req) -> (u64, oneshot::Receiver<ProcResult<P>>) {
let (tx, rx) = oneshot::channel();
let id = self.version;
@@ -158,7 +186,7 @@ impl<P: QueueProcessing> BatchQueueInner<P> {
(id, rx)
}
fn get_batch(&mut self, p: &P) -> (Vec<P::Req>, Vec<tokio::sync::oneshot::Sender<P::Res>>) {
fn get_batch(&mut self, p: &P) -> (Vec<P::Req>, Vec<oneshot::Sender<ProcResult<P>>>) {
let batch_size = p.batch_size(self.queue.len());
let mut reqs = Vec::with_capacity(batch_size);
let mut resps = Vec::with_capacity(batch_size);

View File

@@ -522,15 +522,7 @@ pub async fn run() -> anyhow::Result<()> {
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
}
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend
&& let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api
&& let Some(client) = redis_client
{
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
if let Some(client) = redis_client {
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
@@ -560,6 +552,16 @@ pub async fn run() -> anyhow::Result<()> {
}
}
}
#[allow(irrefutable_let_patterns)]
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend
&& let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api
{
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client, cache.clone()));
maintenance_tasks.spawn(async move { cache.gc_worker().await });
}
}
let maintenance = loop {

View File

@@ -4,12 +4,11 @@ use std::pin::pin;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use anyhow::anyhow;
use futures::FutureExt;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::RawCancelToken;
use postgres_client::tls::MakeTlsConnect;
use redis::{Cmd, FromRedisValue, Value};
use redis::{Cmd, FromRedisValue, SetExpiry, SetOptions, Value};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::net::TcpStream;
@@ -18,7 +17,7 @@ use tracing::{debug, error, info};
use crate::auth::AuthError;
use crate::auth::backend::ComputeUserInfo;
use crate::batch::{BatchQueue, QueueProcessing};
use crate::batch::{BatchQueue, BatchQueueError, QueueProcessing};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::ControlPlaneApi;
@@ -28,7 +27,7 @@ use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, Redis
use crate::pqproto::CancelKeyData;
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
use crate::redis::kv_ops::RedisKVClient;
use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError};
type IpSubnetKey = IpNet;
@@ -45,6 +44,17 @@ pub enum CancelKeyOp {
GetCancelData {
key: CancelKeyData,
},
GetCancelDataOld {
key: CancelKeyData,
},
}
#[derive(thiserror::Error, Debug, Clone)]
pub enum PipelineError {
#[error("could not send cmd to redis: {0}")]
RedisKVClient(Arc<RedisKVClientError>),
#[error("incorrect number of responses from redis")]
IncorrectNumberOfResponses,
}
pub struct Pipeline {
@@ -60,7 +70,7 @@ impl Pipeline {
}
}
async fn execute(self, client: &mut RedisKVClient) -> Vec<anyhow::Result<Value>> {
async fn execute(self, client: &mut RedisKVClient) -> Result<Vec<Value>, PipelineError> {
let responses = self.replies;
let batch_size = self.inner.len();
@@ -78,30 +88,20 @@ impl Pipeline {
batch_size,
responses, "successfully completed cancellation jobs",
);
values.into_iter().map(Ok).collect()
Ok(values.into_iter().collect())
}
Ok(value) => {
error!(batch_size, ?value, "unexpected redis return value");
std::iter::repeat_with(|| Err(anyhow!("incorrect response type from redis")))
.take(responses)
.collect()
}
Err(err) => {
std::iter::repeat_with(|| Err(anyhow!("could not send cmd to redis: {err}")))
.take(responses)
.collect()
Err(PipelineError::IncorrectNumberOfResponses)
}
Err(err) => Err(PipelineError::RedisKVClient(Arc::new(err))),
}
}
fn add_command_with_reply(&mut self, cmd: Cmd) {
fn add_command(&mut self, cmd: Cmd) {
self.inner.add_command(cmd);
self.replies += 1;
}
fn add_command_no_reply(&mut self, cmd: Cmd) {
self.inner.add_command(cmd).ignore();
}
}
impl CancelKeyOp {
@@ -109,12 +109,19 @@ impl CancelKeyOp {
match self {
CancelKeyOp::StoreCancelKey { key, value, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value));
pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64));
pipe.add_command(Cmd::set_options(
&key,
&**value,
SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())),
));
}
CancelKeyOp::GetCancelDataOld { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::hget(key, "data"));
}
CancelKeyOp::GetCancelData { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_with_reply(Cmd::hget(key, "data"));
pipe.add_command(Cmd::get(key));
}
}
}
@@ -127,13 +134,14 @@ pub struct CancellationProcessor {
impl QueueProcessing for CancellationProcessor {
type Req = (CancelChannelSizeGuard<'static>, CancelKeyOp);
type Res = anyhow::Result<redis::Value>;
type Res = redis::Value;
type Err = PipelineError;
fn batch_size(&self, _queue_size: usize) -> usize {
self.batch_size
}
async fn apply(&mut self, batch: Vec<Self::Req>) -> Vec<Self::Res> {
async fn apply(&mut self, batch: Vec<Self::Req>) -> Result<Vec<Self::Res>, Self::Err> {
if !self.client.credentials_refreshed() {
// this will cause a timeout for cancellation operations
tracing::debug!(
@@ -244,18 +252,18 @@ impl CancellationHandler {
&self,
key: CancelKeyData,
) -> Result<Option<CancelClosure>, CancelError> {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetCancelData { key };
const TIMEOUT: Duration = Duration::from_secs(5);
let Some(tx) = self.tx.get() else {
tracing::warn!("cancellation handler is not available");
return Err(CancelError::InternalError);
};
const TIMEOUT: Duration = Duration::from_secs(5);
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Get);
let op = CancelKeyOp::GetCancelData { key };
let result = timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
@@ -264,10 +272,37 @@ impl CancellationHandler {
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
// cannot be cancelled
.unwrap_or_else(|x| match x {})
.map_err(|e| {
})?;
// We may still have cancel keys set with HSET <key> "data".
// Check error type and retry with HGET.
// TODO: remove code after HSET is not used anymore.
let result = if let Err(err) = result.as_ref()
&& let BatchQueueError::Result(err) = err
&& let PipelineError::RedisKVClient(err) = err
&& let RedisKVClientError::Redis(err) = &**err
&& let Some(errcode) = err.code()
&& errcode == "WRONGTYPE"
{
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetCancelDataOld { key };
timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
} else {
result
};
let result = result.map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
CancelError::InternalError
})?;
@@ -442,7 +477,7 @@ impl Session {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HSet);
.guard(RedisMsgKind::Set);
let op = CancelKeyOp::StoreCancelKey {
key: self.key,
value: closure_json.clone(),
@@ -456,7 +491,7 @@ impl Session {
);
match tx.call((guard, op), cancel.as_mut()).await {
Ok(Ok(_)) => {
Ok(_) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
@@ -467,10 +502,10 @@ impl Session {
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
}
// retry immediately.
Ok(Err(error)) => {
Err(BatchQueueError::Result(error)) => {
tracing::warn!(?error, "error registering cancellation key");
}
Err(Err(_cancelled)) => break,
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
}
}

View File

@@ -374,11 +374,9 @@ pub enum Waiting {
#[label(singleton = "kind")]
#[allow(clippy::enum_variant_names)]
pub enum RedisMsgKind {
HSet,
HSetMultiple,
Set,
Get,
HGet,
HGetAll,
HDel,
}
#[derive(Default, Clone)]

View File

@@ -4,11 +4,12 @@ use std::time::Duration;
use futures::FutureExt;
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult};
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisError, RedisResult};
use tokio::task::AbortHandle;
use tracing::{error, info, warn};
use super::elasticache::CredentialsProvider;
use crate::redis::elasticache::CredentialsProviderError;
enum Credentials {
Static(ConnectionInfo),
@@ -26,6 +27,14 @@ impl Clone for Credentials {
}
}
#[derive(thiserror::Error, Debug)]
pub enum ConnectionProviderError {
#[error(transparent)]
Redis(#[from] RedisError),
#[error(transparent)]
CredentialsProvider(#[from] CredentialsProviderError),
}
/// A wrapper around `redis::MultiplexedConnection` that automatically refreshes the token.
/// Provides PubSub connection without credentials refresh.
pub struct ConnectionWithCredentialsProvider {
@@ -86,15 +95,18 @@ impl ConnectionWithCredentialsProvider {
}
}
async fn ping(con: &mut MultiplexedConnection) -> RedisResult<()> {
redis::cmd("PING").query_async(con).await
async fn ping(con: &mut MultiplexedConnection) -> Result<(), ConnectionProviderError> {
redis::cmd("PING")
.query_async(con)
.await
.map_err(Into::into)
}
pub(crate) fn credentials_refreshed(&self) -> bool {
self.credentials_refreshed.load(Ordering::Relaxed)
}
pub(crate) async fn connect(&mut self) -> anyhow::Result<()> {
pub(crate) async fn connect(&mut self) -> Result<(), ConnectionProviderError> {
let _guard = self.mutex.lock().await;
if let Some(con) = self.con.as_mut() {
match Self::ping(con).await {
@@ -141,7 +153,7 @@ impl ConnectionWithCredentialsProvider {
Ok(())
}
async fn get_connection_info(&self) -> anyhow::Result<ConnectionInfo> {
async fn get_connection_info(&self) -> Result<ConnectionInfo, ConnectionProviderError> {
match &self.credentials {
Credentials::Static(info) => Ok(info.clone()),
Credentials::Dynamic(provider, addr) => {
@@ -160,7 +172,7 @@ impl ConnectionWithCredentialsProvider {
}
}
async fn get_client(&self) -> anyhow::Result<redis::Client> {
async fn get_client(&self) -> Result<redis::Client, ConnectionProviderError> {
let client = redis::Client::open(self.get_connection_info().await?)?;
self.credentials_refreshed.store(true, Ordering::Relaxed);
Ok(client)

View File

@@ -9,10 +9,12 @@ use aws_config::meta::region::RegionProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use aws_credential_types::provider::error::CredentialsError;
use aws_sdk_iam::config::ProvideCredentials;
use aws_sigv4::http_request::{
self, SignableBody, SignableRequest, SignatureLocation, SigningSettings,
self, SignableBody, SignableRequest, SignatureLocation, SigningError, SigningSettings,
};
use aws_sigv4::sign::v4::signing_params::BuildError;
use tracing::info;
#[derive(Debug)]
@@ -40,6 +42,18 @@ impl AWSIRSAConfig {
}
}
#[derive(thiserror::Error, Debug)]
pub enum CredentialsProviderError {
#[error(transparent)]
AwsCredentials(#[from] CredentialsError),
#[error(transparent)]
AwsSigv4Build(#[from] BuildError),
#[error(transparent)]
AwsSigv4Singing(#[from] SigningError),
#[error(transparent)]
Http(#[from] http::Error),
}
/// Credentials provider for AWS elasticache authentication.
///
/// Official documentation:
@@ -92,7 +106,9 @@ impl CredentialsProvider {
})
}
pub(crate) async fn provide_credentials(&self) -> anyhow::Result<(String, String)> {
pub(crate) async fn provide_credentials(
&self,
) -> Result<(String, String), CredentialsProviderError> {
let aws_credentials = self
.credentials_provider
.provide_credentials()

View File

@@ -2,9 +2,18 @@ use std::time::Duration;
use futures::FutureExt;
use redis::aio::ConnectionLike;
use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
use redis::{Cmd, FromRedisValue, Pipeline, RedisError, RedisResult};
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::redis::connection_with_credentials_provider::ConnectionProviderError;
#[derive(thiserror::Error, Debug)]
pub enum RedisKVClientError {
#[error(transparent)]
Redis(#[from] RedisError),
#[error(transparent)]
ConnectionProvider(#[from] ConnectionProviderError),
}
pub struct RedisKVClient {
client: ConnectionWithCredentialsProvider,
@@ -32,12 +41,13 @@ impl RedisKVClient {
Self { client }
}
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
pub async fn try_connect(&mut self) -> Result<(), RedisKVClientError> {
self.client
.connect()
.boxed()
.await
.inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
.map_err(Into::into)
}
pub(crate) fn credentials_refreshed(&self) -> bool {
@@ -47,7 +57,7 @@ impl RedisKVClient {
pub(crate) async fn query<T: FromRedisValue>(
&mut self,
q: &impl Queryable,
) -> anyhow::Result<T> {
) -> Result<T, RedisKVClientError> {
let e = match q.query(&mut self.client).await {
Ok(t) => return Ok(t),
Err(e) => e,

View File

@@ -1,6 +1,7 @@
use json::{ListSer, ObjectSer, ValueSer};
use postgres_client::Row;
use postgres_client::types::{Kind, Type};
use serde_json::{Map, Value};
use serde_json::Value;
//
// Convert json non-string types to strings, so that they can be passed to Postgres
@@ -74,44 +75,40 @@ pub(crate) enum JsonConversionError {
UnbalancedString,
}
enum OutputMode {
Array(Vec<Value>),
Object(Map<String, Value>),
enum OutputMode<'a> {
Array(ListSer<'a>),
Object(ObjectSer<'a>),
}
impl OutputMode {
fn key(&mut self, key: &str) -> &mut Value {
impl OutputMode<'_> {
fn key(&mut self, key: &str) -> ValueSer<'_> {
match self {
OutputMode::Array(values) => push_entry(values, Value::Null),
OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null),
OutputMode::Array(values) => values.entry(),
OutputMode::Object(map) => map.key(key),
}
}
fn finish(self) -> Value {
fn finish(self) {
match self {
OutputMode::Array(values) => Value::Array(values),
OutputMode::Object(map) => Value::Object(map),
OutputMode::Array(values) => values.finish(),
OutputMode::Object(map) => map.finish(),
}
}
}
fn push_entry<T>(arr: &mut Vec<T>, t: T) -> &mut T {
arr.push(t);
arr.last_mut().expect("a value was just inserted")
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub(crate) fn pg_text_row_to_json(
output: ValueSer,
row: &Row,
raw_output: bool,
array_mode: bool,
) -> Result<Value, JsonConversionError> {
) -> Result<(), JsonConversionError> {
let mut entries = if array_mode {
OutputMode::Array(Vec::with_capacity(row.columns().len()))
OutputMode::Array(output.list())
} else {
OutputMode::Object(Map::with_capacity(row.columns().len()))
OutputMode::Object(output.object())
};
for (i, column) in row.columns().iter().enumerate() {
@@ -120,53 +117,48 @@ pub(crate) fn pg_text_row_to_json(
let value = entries.key(column.name());
match pg_value {
Some(v) if raw_output => *value = Value::String(v.to_string()),
Some(v) if raw_output => value.value(v),
Some(v) => pg_text_to_json(value, v, column.type_())?,
None => *value = Value::Null,
None => value.value(json::Null),
}
}
Ok(entries.finish())
entries.finish();
Ok(())
}
//
// Convert postgres text-encoded value to JSON value
//
fn pg_text_to_json(
output: &mut Value,
val: &str,
pg_type: &Type,
) -> Result<(), JsonConversionError> {
fn pg_text_to_json(output: ValueSer, val: &str, pg_type: &Type) -> Result<(), JsonConversionError> {
if let Kind::Array(elem_type) = pg_type.kind() {
// todo: we should fetch this from postgres.
let delimiter = ',';
let mut array = vec![];
pg_array_parse(&mut array, val, elem_type, delimiter)?;
*output = Value::Array(array);
json::value_as_list!(|output| pg_array_parse(output, val, elem_type, delimiter)?);
return Ok(());
}
match *pg_type {
Type::BOOL => *output = Value::Bool(val == "t"),
Type::BOOL => output.value(val == "t"),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
*output = Value::Number(serde_json::Number::from(val));
output.value(val);
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
*output = Value::Number(num);
if fval.is_finite() {
output.value(fval);
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
*output = Value::String(val.to_string());
output.value(val);
}
}
Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?,
_ => *output = Value::String(val.to_string()),
// we assume that the string value is valid json.
Type::JSON | Type::JSONB => output.write_raw_json(val.as_bytes()),
_ => output.value(val),
}
Ok(())
@@ -192,7 +184,7 @@ fn pg_text_to_json(
/// gets its own level of curly braces, and delimiters must be written between adjacent
/// curly-braced entities of the same level.
fn pg_array_parse(
elements: &mut Vec<Value>,
elements: &mut ListSer,
mut pg_array: &str,
elem: &Type,
delim: char,
@@ -221,7 +213,7 @@ fn pg_array_parse(
/// reads a single array from the `pg_array` string and pushes each values to `elements`.
/// returns the rest of the `pg_array` string that was not read.
fn pg_array_parse_inner<'a>(
elements: &mut Vec<Value>,
elements: &mut ListSer,
mut pg_array: &'a str,
elem: &Type,
delim: char,
@@ -234,7 +226,7 @@ fn pg_array_parse_inner<'a>(
let mut q = String::new();
loop {
let value = push_entry(elements, Value::Null);
let value = elements.entry();
pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?;
// check for separator.
@@ -260,7 +252,7 @@ fn pg_array_parse_inner<'a>(
///
/// `quoted` is a scratch allocation that has no defined output.
fn pg_array_parse_item<'a>(
output: &mut Value,
output: ValueSer,
quoted: &mut String,
mut pg_array: &'a str,
elem: &Type,
@@ -276,9 +268,8 @@ fn pg_array_parse_item<'a>(
if pg_array.starts_with('{') {
// nested array.
let mut nested = vec![];
pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?;
*output = Value::Array(nested);
pg_array =
json::value_as_list!(|output| pg_array_parse_inner(output, pg_array, elem, delim))?;
return Ok(pg_array);
}
@@ -306,7 +297,7 @@ fn pg_array_parse_item<'a>(
// we might have an item string:
// check for null
if item == "NULL" {
*output = Value::Null;
output.value(json::Null);
} else {
pg_text_to_json(output, item, elem)?;
}
@@ -440,15 +431,15 @@ mod tests {
}
fn pg_text_to_json(val: &str, pg_type: &Type) -> Value {
let mut v = Value::Null;
super::pg_text_to_json(&mut v, val, pg_type).unwrap();
v
let output = json::value_to_string!(|v| super::pg_text_to_json(v, val, pg_type).unwrap());
serde_json::from_str(&output).unwrap()
}
fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value {
let mut array = vec![];
super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap();
Value::Array(array)
let output = json::value_to_string!(|v| json::value_as_list!(|v| {
super::pg_array_parse(v, pg_array, pg_type, ',').unwrap();
}));
serde_json::from_str(&output).unwrap()
}
#[test]

View File

@@ -14,10 +14,7 @@ use hyper::http::{HeaderName, HeaderValue};
use hyper::{Request, Response, StatusCode, header};
use indexmap::IndexMap;
use postgres_client::error::{DbError, ErrorPosition, SqlState};
use postgres_client::{
GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction,
};
use serde::Serialize;
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use serde_json::Value;
use serde_json::value::RawValue;
use tokio::time::{self, Instant};
@@ -687,32 +684,21 @@ impl QueryData {
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
match select(
let mut json_buf = vec![];
let batch_result = match select(
pin!(query_to_json(
config,
&mut *inner,
self,
&mut 0,
json::ValueSer::new(&mut json_buf),
parsed_headers
)),
pin!(cancel.cancelled()),
)
.await
{
// The query successfully completed.
Either::Left((Ok((status, results)), __not_yet_cancelled)) => {
discard.check_idle(status);
let json_output =
serde_json::to_string(&results).expect("json serialization should not fail");
Ok(json_output)
}
// The query failed with an error
Either::Left((Err(e), __not_yet_cancelled)) => {
discard.discard();
Err(e)
}
// The query was cancelled.
Either::Left((res, __not_yet_cancelled)) => res,
Either::Right((_cancelled, query)) => {
tracing::info!("cancelling query");
if let Err(err) = cancel_token.cancel_query(NoTls).await {
@@ -721,13 +707,7 @@ impl QueryData {
// wait for the query cancellation
match time::timeout(time::Duration::from_millis(100), query).await {
// query successed before it was cancelled.
Ok(Ok((status, results))) => {
discard.check_idle(status);
let json_output = serde_json::to_string(&results)
.expect("json serialization should not fail");
Ok(json_output)
}
Ok(Ok(status)) => Ok(status),
// query failed or was cancelled.
Ok(Err(error)) => {
let db_error = match &error {
@@ -743,14 +723,29 @@ impl QueryData {
discard.discard();
}
Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres))
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
Err(_timeout) => {
discard.discard();
Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres))
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
}
}
};
match batch_result {
// The query successfully completed.
Ok(status) => {
discard.check_idle(status);
let json_output = String::from_utf8(json_buf).expect("json should be valid utf8");
Ok(json_output)
}
// The query failed with an error
Err(e) => {
discard.discard();
Err(e)
}
}
}
}
@@ -787,7 +782,7 @@ impl BatchQueryData {
})
.map_err(SqlOverHttpError::Postgres)?;
let json_output = match query_batch(
let json_output = match query_batch_to_json(
config,
cancel.child_token(),
&mut transaction,
@@ -845,24 +840,21 @@ async fn query_batch(
transaction: &mut Transaction<'_>,
queries: BatchQueryData,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let mut results = Vec::with_capacity(queries.queries.len());
let mut current_size = 0;
results: &mut json::ListSer<'_>,
) -> Result<(), SqlOverHttpError> {
for stmt in queries.queries {
let query = pin!(query_to_json(
config,
transaction,
stmt,
&mut current_size,
results.entry(),
parsed_headers,
));
let cancelled = pin!(cancel.cancelled());
let res = select(query, cancelled).await;
match res {
// TODO: maybe we should check that the transaction bit is set here
Either::Left((Ok((_, values)), _cancelled)) => {
results.push(values);
}
Either::Left((Ok(_), _cancelled)) => {}
Either::Left((Err(e), _cancelled)) => {
return Err(e);
}
@@ -872,8 +864,22 @@ async fn query_batch(
}
}
let results = json!({ "results": results });
let json_output = serde_json::to_string(&results).expect("json serialization should not fail");
Ok(())
}
async fn query_batch_to_json(
config: &'static HttpConfig,
cancel: CancellationToken,
tx: &mut Transaction<'_>,
queries: BatchQueryData,
headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let json_output = json::value_to_string!(|obj| json::value_as_object!(|obj| {
let results = obj.key("results");
json::value_as_list!(|results| {
query_batch(config, cancel, tx, queries, headers, results).await?;
});
}));
Ok(json_output)
}
@@ -882,54 +888,54 @@ async fn query_to_json<T: GenericClient>(
config: &'static HttpConfig,
client: &mut T,
data: QueryData,
current_size: &mut usize,
output: json::ValueSer<'_>,
parsed_headers: HttpHeaders,
) -> Result<(ReadyForQueryStatus, impl Serialize + use<T>), SqlOverHttpError> {
) -> Result<ReadyForQueryStatus, SqlOverHttpError> {
let query_start = Instant::now();
let query_params = data.params;
let mut output = json::ObjectSer::new(output);
let mut row_stream = client
.query_raw_txt(&data.query, query_params)
.query_raw_txt(&data.query, data.params)
.await
.map_err(SqlOverHttpError::Postgres)?;
let query_acknowledged = Instant::now();
let columns_len = row_stream.statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut json_fields = output.key("fields").list();
for c in row_stream.statement.columns() {
fields.push(json!({
"name": c.name().to_owned(),
"dataTypeID": c.type_().oid(),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
}));
let json_field = json_fields.entry();
json::value_as_object!(|json_field| {
json_field.entry("name", c.name());
json_field.entry("dataTypeID", c.type_().oid());
json_field.entry("tableID", c.table_oid());
json_field.entry("columnID", c.column_id());
json_field.entry("dataTypeSize", c.type_size());
json_field.entry("dataTypeModifier", c.type_modifier());
json_field.entry("format", "text");
});
}
json_fields.finish();
let raw_output = parsed_headers.raw_output;
let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode);
let raw_output = parsed_headers.raw_output;
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
let mut rows = Vec::new();
let mut rows = 0;
let mut json_rows = output.key("rows").list();
while let Some(row) = row_stream.next().await {
let row = row.map_err(SqlOverHttpError::Postgres)?;
*current_size += row.body_len();
// we don't have a streaming response support yet so this is to prevent OOM
// from a malicious query (eg a cross join)
if *current_size > config.max_response_size_bytes {
if json_rows.as_buffer().len() > config.max_response_size_bytes {
return Err(SqlOverHttpError::ResponseTooLarge(
config.max_response_size_bytes,
));
}
let row = pg_text_row_to_json(&row, raw_output, array_mode)?;
rows.push(row);
pg_text_row_to_json(json_rows.entry(), &row, raw_output, array_mode)?;
rows += 1;
// assumption: parsing pg text and converting to json takes CPU time.
// let's assume it is slightly expensive, so we should consume some cooperative budget.
@@ -937,16 +943,14 @@ async fn query_to_json<T: GenericClient>(
// of rows and never hit the tokio mpsc for a long time (although unlikely).
tokio::task::consume_budget().await;
}
json_rows.finish();
let query_resp_end = Instant::now();
let RowStream {
command_tag,
status: ready,
..
} = row_stream;
let ready = row_stream.status;
// grab the command tag and number of rows affected
let command_tag = command_tag.unwrap_or_default();
let command_tag = row_stream.command_tag.unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
@@ -959,7 +963,7 @@ async fn query_to_json<T: GenericClient>(
.and_then(|s| s.parse::<i64>().ok());
info!(
rows = rows.len(),
rows,
?ready,
command_tag,
acknowledgement = ?(query_acknowledged - query_start),
@@ -967,16 +971,12 @@ async fn query_to_json<T: GenericClient>(
"finished executing query"
);
// Resulting JSON format is based on the format of node-postgres result.
let results = json!({
"command": command_tag_name.to_string(),
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
"rowAsArray": array_mode,
});
output.entry("command", command_tag_name);
output.entry("rowCount", command_tag_count);
output.entry("rowAsArray", array_mode);
Ok((ready, results))
output.finish();
Ok(ready)
}
enum Client {

View File

@@ -850,6 +850,31 @@ async fn handle_tenant_describe(
json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
}
/* BEGIN_HADRON */
async fn handle_tenant_timeline_describe(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Scrubber)?;
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(_req) => {}
};
json_response(
StatusCode::OK,
service
.tenant_timeline_describe(tenant_id, timeline_id)
.await?,
)
}
/* END_HADRON */
async fn handle_tenant_list(
service: Arc<Service>,
req: Request<Body>,
@@ -2480,6 +2505,13 @@ pub fn make_router(
)
})
// Timeline operations
.get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
tenant_service_handler(
r,
handle_tenant_timeline_describe,
RequestName("v1_tenant_timeline_describe"),
)
})
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
tenant_service_handler(
r,

View File

@@ -222,6 +222,9 @@ struct Cli {
/// Primarily useful for testing to reduce test execution time.
#[arg(long, default_value = "false", action=ArgAction::Set)]
kick_secondary_downloads: bool,
#[arg(long)]
shard_split_request_timeout: Option<humantime::Duration>,
}
enum StrictMode {
@@ -470,6 +473,10 @@ async fn async_main() -> anyhow::Result<()> {
timeline_safekeeper_count: args.timeline_safekeeper_count,
posthog_config: posthog_config.clone(),
kick_secondary_downloads: args.kick_secondary_downloads,
shard_split_request_timeout: args
.shard_split_request_timeout
.map(humantime::Duration::into)
.unwrap_or(Duration::MAX),
};
// Validate that we can connect to the database

View File

@@ -86,6 +86,23 @@ impl PageserverClient {
)
}
/* BEGIN_HADRON */
pub(crate) async fn tenant_timeline_describe(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Result<TimelineInfo> {
measured_request!(
"tenant_timeline_describe",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner
.tenant_timeline_describe(tenant_shard_id, timeline_id,)
.await
)
}
/* END_HADRON */
pub(crate) async fn tenant_scan_remote_storage(
&self,
tenant_id: TenantId,

View File

@@ -32,7 +32,7 @@ use pageserver_api::controller_api::{
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse, TenantTimelineDescribeResponse,
};
use pageserver_api::models::{
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease,
@@ -60,6 +60,7 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use utils::completion::Barrier;
use utils::env;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -483,6 +484,9 @@ pub struct Config {
/// When set, actively checks and initiates heatmap downloads/uploads.
pub kick_secondary_downloads: bool,
/// Timeout used for HTTP client of split requests. [`Duration::MAX`] if None.
pub shard_split_request_timeout: Duration,
}
impl From<DatabaseError> for ApiError {
@@ -5206,6 +5210,9 @@ impl Service {
match res {
Ok(ok) => Ok(ok),
Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT),
Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) if msg.contains("Requested tenant is missing") => {
Err(ApiError::ResourceUnavailable("Tenant migration in progress".into()))
},
Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())),
Err(e) => {
Err(
@@ -5486,6 +5493,92 @@ impl Service {
.ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into()))
}
/* BEGIN_HADRON */
pub(crate) async fn tenant_timeline_describe(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TenantTimelineDescribeResponse, ApiError> {
self.tenant_remote_mutation(tenant_id, |locations| async move {
if locations.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
));
};
let locations: Vec<(TenantShardId, Node)> = locations
.0
.iter()
.map(|t| (*t.0, t.1.latest.node.clone()))
.collect();
let mut futs = FuturesUnordered::new();
for (shard_id, node) in locations {
futs.push({
async move {
let result = node
.with_client_retries(
|client| async move {
client
.tenant_timeline_describe(&shard_id, &timeline_id)
.await
},
&self.http_client,
&self.config.pageserver_jwt_token,
3,
3,
Duration::from_secs(30),
&self.cancel,
)
.await;
(result, shard_id, node.get_id())
}
});
}
let mut results: Vec<TimelineInfo> = Vec::new();
while let Some((result, tenant_shard_id, node_id)) = futs.next().await {
match result {
Some(Ok(timeline_info)) => results.push(timeline_info),
Some(Err(e)) => {
tracing::warn!(
"Failed to describe tenant {} timeline {} for pageserver {}: {e}",
tenant_shard_id,
timeline_id,
node_id,
);
return Err(ApiError::ResourceUnavailable(format!("{e}").into()));
}
None => return Err(ApiError::Cancelled),
}
}
let mut image_consistent_lsn: Option<Lsn> = Some(Lsn::MAX);
for timeline_info in &results {
if let Some(tline_image_consistent_lsn) = timeline_info.image_consistent_lsn {
image_consistent_lsn = Some(std::cmp::min(
image_consistent_lsn.unwrap(),
tline_image_consistent_lsn,
));
} else {
tracing::warn!(
"Timeline {} on shard {} does not have image consistent lsn",
timeline_info.timeline_id,
timeline_info.tenant_id
);
image_consistent_lsn = None;
break;
}
}
Ok(TenantTimelineDescribeResponse {
shards: results,
image_consistent_lsn,
})
})
.await?
}
/* END_HADRON */
/// limit & offset are pagination parameters. Since we are walking an in-memory HashMap, `offset` does not
/// avoid traversing data, it just avoid returning it. This is suitable for our purposes, since our in memory
/// maps are small enough to traverse fast, our pagination is just to avoid serializing huge JSON responses
@@ -6317,18 +6410,39 @@ impl Service {
// TODO: issue split calls concurrently (this only matters once we're splitting
// N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
// HADRON: set a timeout for splitting individual shards on page servers.
// Currently we do not perform any retry because it's not clear if page server can handle
// partially split shards correctly.
let shard_split_timeout =
if let Some(env::DeploymentMode::Local) = env::get_deployment_mode() {
Duration::from_secs(30)
} else {
self.config.shard_split_request_timeout
};
let mut http_client_builder = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(0)
.timeout(shard_split_timeout);
for ssl_ca_cert in &self.config.ssl_ca_certs {
http_client_builder = http_client_builder.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client_builder
.build()
.expect("Failed to construct HTTP client");
for target in &targets {
let ShardSplitTarget {
parent_id,
node,
child_ids,
} = target;
let client = PageserverClient::new(
node.get_id(),
self.http_client.clone(),
http_client.clone(),
node.base_url(),
self.config.pageserver_jwt_token.as_deref(),
);
let response = client
.tenant_shard_split(
*parent_id,

View File

@@ -25,7 +25,8 @@ use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::PgVersionId;
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
use safekeeper_api::models::{
PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse,
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
TimelineMembershipSwitchResponse,
};
use safekeeper_api::{INITIAL_TERM, Term};
use safekeeper_client::mgmt_api;
@@ -37,21 +38,14 @@ use utils::lsn::Lsn;
use super::Service;
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct TimelineLocateResponse {
pub generation: SafekeeperGeneration,
pub sk_set: Vec<NodeId>,
pub new_sk_set: Option<Vec<NodeId>>,
}
impl Service {
fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, ApiError> {
fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, anyhow::Error> {
let members = safekeepers
.iter()
.map(|sk| sk.get_safekeeper_id())
.collect::<Vec<_>>();
MemberSet::new(members).map_err(ApiError::InternalServerError)
MemberSet::new(members)
}
fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
@@ -86,7 +80,7 @@ impl Service {
) -> Result<Vec<NodeId>, ApiError> {
let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
let mset = Self::make_member_set(&safekeepers)?;
let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?;
let mconf = safekeeper_api::membership::Configuration::new(mset);
let req = safekeeper_api::models::TimelineCreateRequest {
@@ -1111,6 +1105,26 @@ impl Service {
}
}
if new_sk_set.is_empty() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"new safekeeper set is empty"
)));
}
if new_sk_set.len() < self.config.timeline_safekeeper_count {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"new safekeeper set must have at least {} safekeepers",
self.config.timeline_safekeeper_count
)));
}
let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
// Construct new member set in advance to validate it.
// E.g. validates that there is no duplicate safekeepers.
let new_sk_member_set =
Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?;
// TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
@@ -1141,6 +1155,18 @@ impl Service {
.map(|&id| NodeId(id as u64))
.collect::<Vec<_>>();
// Validate that we are not migrating to a decomissioned safekeeper.
for sk in new_safekeepers.iter() {
if !cur_sk_set.contains(&sk.get_id())
&& sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned
{
return Err(ApiError::BadRequest(anyhow::anyhow!(
"safekeeper {} is decomissioned",
sk.get_id()
)));
}
}
tracing::info!(
?cur_sk_set,
?new_sk_set,
@@ -1183,11 +1209,8 @@ impl Service {
}
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?;
let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
let new_sk_member_set = Self::make_member_set(&new_safekeepers)?;
let cur_sk_member_set =
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
let joint_config = membership::Configuration {
generation,

View File

@@ -2342,6 +2342,20 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
return response.json()
# HADRON
def tenant_timeline_describe(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
):
response = self.request(
"GET",
f"{self.api}/control/v1/tenant/{tenant_id}/timeline/{timeline_id}",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
return response.json()
def nodes(self):
"""
:return: list of {"id": ""}
@@ -5406,6 +5420,7 @@ SKIP_FILES = frozenset(
(
"pg_internal.init",
"pg.log",
"neon.signal",
"zenith.signal",
"pg_hba.conf",
"postgresql.conf",

View File

@@ -115,8 +115,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*Local data loss suspected.*",
# Too many frozen layers error is normal during intensive benchmarks
".*too many frozen layers.*",
# Transient errors when resolving tenant shards by page service
".*Fail to resolve tenant shard in attempt.*",
".*Failed to resolve tenant shard after.*",
# Expected warnings when pageserver has not refreshed GC info yet
".*pitr LSN/interval not found, skipping force image creation LSN calculation.*",
".*No broker updates received for a while.*",

View File

@@ -7,6 +7,7 @@ import time
from enum import StrEnum
import pytest
from fixtures.common_types import TenantShardId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -960,9 +961,70 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id):
return image_layer_count, delta_layer_count
def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
def test_image_layer_creation_time_threshold(neon_env_builder: NeonEnvBuilder):
"""
Tests that page server can force creating new images if image creation timeout is enabled
Tests that image layers can be created when the time threshold is reached on non-0 shards.
"""
tenant_conf = {
"compaction_threshold": "100",
"image_creation_threshold": "100",
"image_layer_creation_check_threshold": "1",
# disable distance based image layer creation check
"checkpoint_distance": 10 * 1024 * 1024 * 1024,
"checkpoint_timeout": "100ms",
"image_layer_force_creation_period": "1s",
"pitr_interval": "10s",
"gc_period": "1s",
"compaction_period": "1s",
"lsn_lease_length": "1s",
}
# consider every tenant large to run the image layer generation check more eagerly
neon_env_builder.pageserver_config_override = (
"image_layer_generation_large_timeline_threshold=0"
)
neon_env_builder.num_pageservers = 1
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf,
initial_tenant_shard_count=2,
initial_tenant_shard_stripe_size=1,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)")
for v in range(10):
endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))")
tenant_shard_id = TenantShardId(tenant_id, 1, 2)
# Generate some rows.
for v in range(20):
endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))")
# restart page server so that logical size on non-0 shards is missing
env.pageserver.restart()
(old_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0)
log.info(f"old images: {old_images}, old deltas: {old_deltas}")
def check_image_creation():
(new_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0)
log.info(f"images: {new_images}, deltas: {old_deltas}")
assert new_images > old_images
wait_until(check_image_creation)
endpoint.stop_and_destroy()
def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder):
"""
Tests that page server can force creating new images if image_layer_force_creation_period is enabled
"""
# use large knobs to disable L0 compaction/image creation except for the force image creation
tenant_conf = {
@@ -972,10 +1034,10 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
"checkpoint_distance": 10 * 1024,
"checkpoint_timeout": "1s",
"image_layer_force_creation_period": "1s",
# The lsn for forced image layer creations is calculated once every 10 minutes.
# Hence, drive compaction manually such that the test doesn't compute it at the
# wrong time.
"compaction_period": "0s",
"pitr_interval": "10s",
"gc_period": "1s",
"compaction_period": "1s",
"lsn_lease_length": "1s",
}
# consider every tenant large to run the image layer generation check more eagerly
@@ -1018,4 +1080,69 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
)
def test_image_consistent_lsn(neon_env_builder: NeonEnvBuilder):
"""
Test the /v1/tenant/<tenant_id>/timeline/<timeline_id> endpoint and the computation of image_consistent_lsn
"""
# use large knobs to disable L0 compaction/image creation except for the force image creation
tenant_conf = {
"compaction_threshold": "100",
"image_creation_threshold": "100",
"image_layer_creation_check_threshold": "1",
"checkpoint_distance": 10 * 1024,
"checkpoint_timeout": "1s",
"image_layer_force_creation_period": "1s",
"pitr_interval": "10s",
"gc_period": "1s",
"compaction_period": "1s",
"lsn_lease_length": "1s",
}
neon_env_builder.num_pageservers = 2
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start(
initial_tenant_conf=tenant_conf,
initial_tenant_shard_count=4,
initial_tenant_shard_stripe_size=1,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)")
for v in range(10):
endpoint.safe_psql(
f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False
)
response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id)
shards = response["shards"]
for shard in shards:
assert shard["image_consistent_lsn"] is not None
image_consistent_lsn = response["image_consistent_lsn"]
assert image_consistent_lsn is not None
# do more writes and wait for image_consistent_lsn to advance
for v in range(100):
endpoint.safe_psql(
f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False
)
def check_image_consistent_lsn_advanced():
response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id)
new_image_consistent_lsn = response["image_consistent_lsn"]
shards = response["shards"]
for shard in shards:
print(f"shard {shard['tenant_id']} image_consistent_lsn{shard['image_consistent_lsn']}")
assert new_image_consistent_lsn != image_consistent_lsn
wait_until(check_image_consistent_lsn_advanced)
endpoint.stop_and_destroy()
for ps in env.pageservers:
ps.allowed_errors.append(".*created delta file of size.*larger than double of target.*")
# END_HADRON

View File

@@ -2,6 +2,9 @@ from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from fixtures.neon_fixtures import StorageControllerApiException
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -75,3 +78,38 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
ep.start(safekeeper_generation=1, safekeepers=[3])
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper_migrate validates the new_sk_set before starting the migration.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 2,
}
env = neon_env_builder.init_start()
def expect_fail(sk_set: list[int], match: str):
with pytest.raises(StorageControllerApiException, match=match):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, sk_set
)
# Check that we failed before commiting to the database.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == 1
expect_fail([], "safekeeper set is empty")
expect_fail([1], "must have at least 2 safekeepers")
expect_fail([1, 1], "duplicate safekeeper")
expect_fail([1, 100500], "does not exist")
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
sk_set = mconf["sk_set"]
assert len(sk_set) == 2
decom_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned")
expect_fail([sk_set[0], decom_sk], "decomissioned")

View File

@@ -1673,6 +1673,91 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder):
# END_HADRON
# HADRON
@pytest.mark.skip(reason="The backpressure change has not been merged yet.")
def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
"""
Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level.
"""
init_shard_count = 4
neon_env_builder.num_pageservers = init_shard_count
stripe_size = 1
env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count,
initial_tenant_shard_stripe_size=stripe_size,
initial_tenant_conf={
# disable auto-flush of shards and set max_replication_flush_lag as 15MB.
# The backpressure parameters must be enforced at the shard level to avoid stalling PG.
"checkpoint_distance": 1 * 1024 * 1024 * 1024,
"checkpoint_timeout": "1h",
},
)
endpoint = env.endpoints.create(
"main",
config_lines=[
"max_replication_write_lag = 0",
"max_replication_apply_lag = 0",
"max_replication_flush_lag = 15MB",
"neon.max_cluster_size = 10GB",
],
)
endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created.
endpoint.start()
# generate 20MB of data
endpoint.safe_psql(
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;"
)
res = endpoint.safe_psql(
"SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system"
)[0]
assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"
endpoint.stop()
# HADRON
def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder):
"""
Tests that shard split can correctly handle page server timeouts and abort the split
"""
init_shard_count = 2
neon_env_builder.num_pageservers = 1
stripe_size = 1
if neon_env_builder.storage_controller_config is None:
neon_env_builder.storage_controller_config = {"shard_split_request_timeout": "5s"}
else:
neon_env_builder.storage_controller_config["shard_split_request_timeout"] = "5s"
env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count,
initial_tenant_shard_stripe_size=stripe_size,
)
env.storage_controller.allowed_errors.extend(
[
".*Enqueuing background abort.*",
".*failpoint.*",
".*Failed to abort.*",
".*Exclusive lock by ShardSplit was held.*",
]
)
env.pageserver.allowed_errors.extend([".*request was dropped before completing.*"])
endpoint1 = env.endpoints.create_start(branch_name="main")
env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "pause"))
with pytest.raises(StorageControllerApiException):
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4)
env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "off"))
endpoint1.stop_and_destroy()
def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
"""
Check a scenario when one of the shards is much slower than others.

View File

@@ -209,9 +209,9 @@ def test_ancestor_detach_branched_from(
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline)
# because we do the fullbackup from ancestor at the branch_lsn, the zenith.signal is always different
# as there is always "PREV_LSN: invalid" for "before"
skip_files = {"zenith.signal"}
# because we do the fullbackup from ancestor at the branch_lsn, the neon.signal and/or zenith.signal is always
# different as there is always "PREV_LSN: invalid" for "before"
skip_files = {"zenith.signal", "neon.signal"}
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, skip_files)
@@ -767,7 +767,7 @@ def test_compaction_induced_by_detaches_in_history(
env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_after
)
# we don't need to skip any files, because zenith.signal will be identical
# we don't need to skip any files, because neon.signal will be identical
assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, set())

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"db424d42d748f8ad91ac00e28db2c7f2efa42f7f"
"353c725b0c76cc82b15af21d8360d03391dc6814"
],
"v16": [
"16.9",
"7a4c0eacaeb9b97416542fa19103061c166460b1"
"e08c8d5f1576ca0487d14d154510499c5f12adfb"
],
"v15": [
"15.13",
"8c3249f36c7df6ac0efb8ee9f1baf4aa1b83e5c9"
"afd46987f3da50c9146a8aa59380052df0862c06"
],
"v14": [
"14.18",
"9085654ee8022d5cc4ca719380a1dc53e5e3246f"
"8ce1f52303aec29e098309347b57c01a1962e221"
]
}