mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Compare commits
81 Commits
rc/release
...
release-85
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1821b6ea3f | ||
|
|
93b964f829 | ||
|
|
d0aaec2abb | ||
|
|
d0dc65da12 | ||
|
|
03d635b916 | ||
|
|
5cd7f936f9 | ||
|
|
be3686e2af | ||
|
|
1c3cb18c60 | ||
|
|
946a0667eb | ||
|
|
e5d15450ec | ||
|
|
d294078d1f | ||
|
|
eb70d7a55c | ||
|
|
20723ea039 | ||
|
|
db95540975 | ||
|
|
90033fe693 | ||
|
|
cb9d439cc1 | ||
|
|
7a2f0c4d53 | ||
|
|
60c907cbdb | ||
|
|
2eb43a5d81 | ||
|
|
b2f45fe37f | ||
|
|
3904a0fe4f | ||
|
|
cd07b120b5 | ||
|
|
7c06a33df0 | ||
|
|
85992000e3 | ||
|
|
c33cf739e3 | ||
|
|
1812fc7cf1 | ||
|
|
929d963a35 | ||
|
|
e405420458 | ||
|
|
1b5258ef6a | ||
|
|
9fbb33e9d9 | ||
|
|
93983ac5fc | ||
|
|
72dd540c87 | ||
|
|
612d0aea4f | ||
|
|
aa3a75a0a7 | ||
|
|
6389c9184c | ||
|
|
e177927476 | ||
|
|
e9bbafebbd | ||
|
|
7430fb9836 | ||
|
|
c45d169527 | ||
|
|
a1e67cfe86 | ||
|
|
0263c92c47 | ||
|
|
5fc599d653 | ||
|
|
66d2592d04 | ||
|
|
517ae7a60e | ||
|
|
b2a769cc86 | ||
|
|
12f0e525c6 | ||
|
|
6c6e5bfc2b | ||
|
|
9c0fefee25 | ||
|
|
97e2e27f68 | ||
|
|
cea2b222d0 | ||
|
|
5b2afd953c | ||
|
|
74e789b155 | ||
|
|
235439e639 | ||
|
|
a78438f15c | ||
|
|
3d370679a1 | ||
|
|
c37e3020ab | ||
|
|
a036708da1 | ||
|
|
1e7ad80ee7 | ||
|
|
581be23100 | ||
|
|
8ca7ea859d | ||
|
|
efea8223bb | ||
|
|
d3d3bfc6d0 | ||
|
|
b3a911ff8c | ||
|
|
a54853abd5 | ||
|
|
69007f7ac8 | ||
|
|
d255fa4b7e | ||
|
|
40d6b3a34e | ||
|
|
a018878e27 | ||
|
|
e5b3eb1e64 | ||
|
|
f35e1356a1 | ||
|
|
4dec0dddc6 | ||
|
|
e0c504af38 | ||
|
|
3399eea2ed | ||
|
|
6a29c809d5 | ||
|
|
a62c01df4c | ||
|
|
4c093c6314 | ||
|
|
32f58f8228 | ||
|
|
96c36c0894 | ||
|
|
d719709316 | ||
|
|
97912f19fc | ||
|
|
49724aa3b6 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1303,6 +1303,7 @@ dependencies = [
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"indexmap 2.0.1",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -28,6 +28,7 @@ flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
indexmap.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
|
||||
@@ -60,12 +60,16 @@ use utils::failpoint_support;
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
fn parse_remote_ext_config(arg: &str) -> Result<String> {
|
||||
if arg.starts_with("http") {
|
||||
Ok(arg.trim_end_matches('/').to_string())
|
||||
fn parse_remote_ext_base_url(arg: &str) -> Result<String> {
|
||||
const FALLBACK_PG_EXT_GATEWAY_BASE_URL: &str =
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local";
|
||||
|
||||
Ok(if arg.starts_with("http") {
|
||||
arg
|
||||
} else {
|
||||
Ok("http://pg-ext-s3-gateway".to_string())
|
||||
FALLBACK_PG_EXT_GATEWAY_BASE_URL
|
||||
}
|
||||
.to_owned())
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -74,8 +78,10 @@ struct Cli {
|
||||
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
|
||||
pub pgbin: String,
|
||||
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
|
||||
pub remote_ext_config: Option<String>,
|
||||
/// The base URL for the remote extension storage proxy gateway.
|
||||
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_base_url, alias = "remote-ext-config")]
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
|
||||
/// The port to bind the external listening HTTP server to. Clients running
|
||||
/// outside the compute will talk to the compute through this port. Keep
|
||||
@@ -164,7 +170,7 @@ fn main() -> Result<()> {
|
||||
pgversion: get_pg_version_string(&cli.pgbin),
|
||||
external_http_port: cli.external_http_port,
|
||||
internal_http_port: cli.internal_http_port,
|
||||
ext_remote_storage: cli.remote_ext_config.clone(),
|
||||
remote_ext_base_url: cli.remote_ext_base_url.clone(),
|
||||
resize_swap_on_bind: cli.resize_swap_on_bind,
|
||||
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -265,4 +271,18 @@ mod test {
|
||||
fn verify_cli() {
|
||||
Cli::command().debug_assert()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_pg_ext_gateway_base_url() {
|
||||
let arg = "http://pg-ext-s3-gateway2";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(result, arg);
|
||||
|
||||
let arg = "pg-ext-s3-gateway";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use compute_api::spec::{
|
||||
use futures::StreamExt;
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use itertools::Itertools;
|
||||
use nix::sys::signal::{Signal, kill};
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -18,7 +19,7 @@ use postgres;
|
||||
use postgres::NoTls;
|
||||
use postgres::error::SqlState;
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::SocketAddr;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::Path;
|
||||
@@ -95,7 +96,7 @@ pub struct ComputeNodeParams {
|
||||
pub internal_http_port: u16,
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub ext_remote_storage: Option<String>,
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
}
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
@@ -1896,9 +1897,9 @@ LIMIT 100",
|
||||
real_ext_name: String,
|
||||
ext_path: RemotePath,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let ext_remote_storage =
|
||||
let remote_ext_base_url =
|
||||
self.params
|
||||
.ext_remote_storage
|
||||
.remote_ext_base_url
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
@@ -1960,7 +1961,7 @@ LIMIT 100",
|
||||
let download_size = extension_server::download_extension(
|
||||
&real_ext_name,
|
||||
&ext_path,
|
||||
ext_remote_storage,
|
||||
remote_ext_base_url,
|
||||
&self.params.pgbin,
|
||||
)
|
||||
.await
|
||||
@@ -1995,23 +1996,40 @@ LIMIT 100",
|
||||
tokio::spawn(conn);
|
||||
|
||||
// TODO: support other types of grants apart from schemas?
|
||||
let query = format!(
|
||||
"GRANT {} ON SCHEMA {} TO {}",
|
||||
privileges
|
||||
.iter()
|
||||
// should not be quoted as it's part of the command.
|
||||
// is already sanitized so it's ok
|
||||
.map(|p| p.as_str())
|
||||
.collect::<Vec<&'static str>>()
|
||||
.join(", "),
|
||||
// quote the schema and role name as identifiers to sanitize them.
|
||||
schema_name.pg_quote(),
|
||||
role_name.pg_quote(),
|
||||
);
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
|
||||
// check the role grants first - to gracefully handle read-replicas.
|
||||
let select = "SELECT privilege_type
|
||||
FROM pg_namespace
|
||||
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
|
||||
JOIN pg_user users ON acl.grantee = users.usesysid
|
||||
WHERE users.usename = $1
|
||||
AND nspname = $2";
|
||||
let rows = db_client
|
||||
.query(select, &[role_name, schema_name])
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
.with_context(|| format!("Failed to execute query: {select}"))?;
|
||||
|
||||
let already_granted: HashSet<String> = rows.into_iter().map(|row| row.get(0)).collect();
|
||||
|
||||
let grants = privileges
|
||||
.iter()
|
||||
.filter(|p| !already_granted.contains(p.as_str()))
|
||||
// should not be quoted as it's part of the command.
|
||||
// is already sanitized so it's ok
|
||||
.map(|p| p.as_str())
|
||||
.join(", ");
|
||||
|
||||
if !grants.is_empty() {
|
||||
// quote the schema and role name as identifiers to sanitize them.
|
||||
let schema_name = schema_name.pg_quote();
|
||||
let role_name = role_name.pg_quote();
|
||||
|
||||
let query = format!("GRANT {grants} ON SCHEMA {schema_name} TO {role_name}",);
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {}", query))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2069,7 +2087,7 @@ LIMIT 100",
|
||||
&self,
|
||||
spec: &ComputeSpec,
|
||||
) -> Result<RemoteExtensionMetrics> {
|
||||
if self.params.ext_remote_storage.is_none() {
|
||||
if self.params.remote_ext_base_url.is_none() {
|
||||
return Ok(RemoteExtensionMetrics {
|
||||
num_ext_downloaded: 0,
|
||||
largest_ext_size: 0,
|
||||
|
||||
@@ -158,14 +158,14 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
|
||||
pub async fn download_extension(
|
||||
ext_name: &str,
|
||||
ext_path: &RemotePath,
|
||||
ext_remote_storage: &str,
|
||||
remote_ext_base_url: &str,
|
||||
pgbin: &str,
|
||||
) -> Result<u64> {
|
||||
info!("Download extension {:?} from {:?}", ext_name, ext_path);
|
||||
|
||||
// TODO add retry logic
|
||||
let download_buffer =
|
||||
match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
|
||||
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
|
||||
Ok(buffer) => buffer,
|
||||
Err(error_message) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -272,8 +272,8 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
|
||||
// Do request to extension storage proxy, e.g.,
|
||||
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
|
||||
// using HTTP GET and return the response body as bytes.
|
||||
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", ext_remote_storage, ext_path);
|
||||
async fn download_extension_tar(remote_ext_base_url: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", remote_ext_base_url, ext_path);
|
||||
let filename = Path::new(ext_path)
|
||||
.file_name()
|
||||
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
|
||||
|
||||
@@ -22,7 +22,7 @@ pub(in crate::http) async fn download_extension(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
// Don't even try to download extensions if no remote storage is configured
|
||||
if compute.params.ext_remote_storage.is_none() {
|
||||
if compute.params.remote_ext_base_url.is_none() {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"remote storage is not configured",
|
||||
|
||||
@@ -644,9 +644,10 @@ struct EndpointStartCmdArgs {
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
help = "Configure the remote extensions storage proxy gateway to request for extensions."
|
||||
help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
|
||||
alias = "remote-ext-config"
|
||||
)]
|
||||
remote_ext_config: Option<String>,
|
||||
remote_ext_base_url: Option<String>,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
@@ -1414,7 +1415,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
EndpointCmd::Start(args) => {
|
||||
let endpoint_id = &args.endpoint_id;
|
||||
let pageserver_id = args.endpoint_pageserver_id;
|
||||
let remote_ext_config = &args.remote_ext_config;
|
||||
let remote_ext_base_url = &args.remote_ext_base_url;
|
||||
|
||||
let default_generation = env
|
||||
.storage_controller
|
||||
@@ -1517,7 +1518,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
safekeepers_generation,
|
||||
safekeepers,
|
||||
pageservers,
|
||||
remote_ext_config.as_ref(),
|
||||
remote_ext_base_url.as_ref(),
|
||||
stripe_size.0 as usize,
|
||||
args.create_test_user,
|
||||
args.start_timeout,
|
||||
|
||||
@@ -655,7 +655,7 @@ impl Endpoint {
|
||||
safekeepers_generation: Option<SafekeeperGeneration>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
remote_ext_config: Option<&String>,
|
||||
remote_ext_base_url: Option<&String>,
|
||||
shard_stripe_size: usize,
|
||||
create_test_user: bool,
|
||||
start_timeout: Duration,
|
||||
@@ -825,8 +825,8 @@ impl Endpoint {
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
|
||||
if let Some(remote_ext_config) = remote_ext_config {
|
||||
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||
if let Some(remote_ext_base_url) = remote_ext_base_url {
|
||||
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
|
||||
}
|
||||
|
||||
let child = cmd.spawn()?;
|
||||
|
||||
@@ -1277,6 +1277,8 @@ impl Timeline {
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
|
||||
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
@@ -1287,7 +1289,7 @@ impl Timeline {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
@@ -1341,6 +1343,10 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(_) => {
|
||||
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
Err(err) if err.is_cancel() => {}
|
||||
@@ -3606,6 +3612,13 @@ impl Timeline {
|
||||
last_key = Some(key);
|
||||
}
|
||||
accumulated_values.push((key, lsn, val));
|
||||
|
||||
if accumulated_values.len() >= 65536 {
|
||||
// Assume all of them are images, that would be 512MB of data in memory for a single key.
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"too many values for a single key, giving up gc-compaction"
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
let last_key: &mut Key = last_key.as_mut().unwrap();
|
||||
stat.on_unique_key_visited(); // TODO: adjust statistics for partial compaction
|
||||
|
||||
@@ -425,15 +425,12 @@ compact_prefetch_buffers(void)
|
||||
* point inside and outside PostgreSQL.
|
||||
*
|
||||
* This still does throw errors when it receives malformed responses from PS.
|
||||
*
|
||||
* When we're not called from CHECK_FOR_INTERRUPTS (indicated by
|
||||
* IsHandlingInterrupts) we also report we've ended prefetch receive work,
|
||||
* just in case state tracking was lost due to an error in the sync getPage
|
||||
* response code.
|
||||
*/
|
||||
void
|
||||
communicator_prefetch_pump_state(bool IsHandlingInterrupts)
|
||||
communicator_prefetch_pump_state(void)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive != MyPState->ring_flush)
|
||||
{
|
||||
NeonResponse *response;
|
||||
@@ -482,9 +479,7 @@ communicator_prefetch_pump_state(bool IsHandlingInterrupts)
|
||||
}
|
||||
}
|
||||
|
||||
/* We never pump the prefetch state while handling other pages */
|
||||
if (!IsHandlingInterrupts)
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
}
|
||||
@@ -672,9 +667,10 @@ prefetch_wait_for(uint64 ring_index)
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive <= ring_index)
|
||||
{
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
entry = GetPrfSlot(MyPState->ring_receive);
|
||||
|
||||
Assert(entry->status == PRFS_REQUESTED);
|
||||
@@ -683,17 +679,18 @@ prefetch_wait_for(uint64 ring_index)
|
||||
result = false;
|
||||
break;
|
||||
}
|
||||
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
if (result)
|
||||
{
|
||||
/* Check that slot is actually received (srver can be disconnected in prefetch_pump_state called from CHECK_FOR_INTERRUPTS */
|
||||
PrefetchRequest *slot = GetPrfSlot(ring_index);
|
||||
return slot->status == PRFS_RECEIVED;
|
||||
result = slot->status == PRFS_RECEIVED;
|
||||
}
|
||||
return false;
|
||||
END_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
return result;
|
||||
;
|
||||
}
|
||||
|
||||
@@ -720,6 +717,7 @@ prefetch_read(PrefetchRequest *slot)
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
Assert(readpage_reentrant_guard);
|
||||
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
@@ -802,6 +800,7 @@ communicator_prefetch_receive(BufferTag tag)
|
||||
PrfHashEntry *entry;
|
||||
PrefetchRequest hashkey;
|
||||
|
||||
Assert(readpage_reentrant_guard);
|
||||
hashkey.buftag = tag;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
|
||||
@@ -821,8 +820,12 @@ communicator_prefetch_receive(BufferTag tag)
|
||||
void
|
||||
prefetch_on_ps_disconnect(void)
|
||||
{
|
||||
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
|
||||
/* Prohibit callig of prefetch_pump_state */
|
||||
START_PREFETCH_RECEIVE_WORK();
|
||||
|
||||
while (MyPState->ring_receive < MyPState->ring_unused)
|
||||
{
|
||||
PrefetchRequest *slot;
|
||||
@@ -851,6 +854,9 @@ prefetch_on_ps_disconnect(void)
|
||||
MyNeonCounters->getpage_prefetch_discards_total += 1;
|
||||
}
|
||||
|
||||
/* Restore guard */
|
||||
readpage_reentrant_guard = save_readpage_reentrant_guard;
|
||||
|
||||
/*
|
||||
* We can have gone into retry due to network error, so update stats with
|
||||
* the latest available
|
||||
@@ -2509,7 +2515,7 @@ communicator_processinterrupts(void)
|
||||
if (timeout_signaled)
|
||||
{
|
||||
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
|
||||
communicator_prefetch_pump_state(true);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
timeout_signaled = false;
|
||||
communicator_reconfigure_timeout_if_needed();
|
||||
|
||||
@@ -44,7 +44,7 @@ extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
|
||||
void *buffer);
|
||||
|
||||
extern void communicator_reconfigure_timeout_if_needed(void);
|
||||
extern void communicator_prefetch_pump_state(bool IsHandlingInterrupts);
|
||||
extern void communicator_prefetch_pump_state(void);
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
@@ -1179,7 +1179,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
blocknum += iterblocks;
|
||||
}
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -1218,7 +1218,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
|
||||
communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -1262,7 +1262,7 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
*/
|
||||
neon_log(SmgrTrace, "writeback noop");
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1315,7 +1315,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
}
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
|
||||
|
||||
@@ -1339,7 +1339,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -1449,7 +1449,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
nblocks, PG_IOV_MAX);
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
@@ -1480,7 +1480,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forknum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -1665,7 +1665,7 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
|
||||
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1727,7 +1727,7 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||
|
||||
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1902,7 +1902,7 @@ neon_immedsync(SMgrRelation reln, ForkNumber forknum)
|
||||
|
||||
neon_log(SmgrTrace, "[NEON_SMGR] immedsync noop");
|
||||
|
||||
communicator_prefetch_pump_state(false);
|
||||
communicator_prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
|
||||
@@ -3886,10 +3886,10 @@ impl Service {
|
||||
|
||||
None
|
||||
} else if safekeepers {
|
||||
// Note that we do not support creating the timeline on the safekeepers
|
||||
// for imported timelines. The `start_lsn` of the timeline is not known
|
||||
// until the import finshes.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
// Note that for imported timelines, we do not create the timeline on the safekeepers
|
||||
// straight away. Instead, we do it once the import finalized such that we know what
|
||||
// start LSN to provide for the safekeepers. This is done in
|
||||
// [`Self::finalize_timeline_import`].
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
|
||||
@@ -3966,11 +3966,22 @@ impl Service {
|
||||
let active = self.timeline_active_on_all_shards(&import).await?;
|
||||
|
||||
match active {
|
||||
true => {
|
||||
Some(timeline_info) => {
|
||||
tracing::info!("Timeline became active on all shards");
|
||||
|
||||
if self.config.timelines_onto_safekeepers {
|
||||
// Now that we know the start LSN of this timeline, create it on the
|
||||
// safekeepers.
|
||||
self.tenant_timeline_create_safekeepers_until_success(
|
||||
import.tenant_id,
|
||||
timeline_info,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
false => {
|
||||
None => {
|
||||
tracing::info!("Timeline not active on all shards yet");
|
||||
|
||||
tokio::select! {
|
||||
@@ -4004,9 +4015,6 @@ impl Service {
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
|
||||
|
||||
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
|
||||
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
tracing::info!(%import_failed, "Timeline import complete");
|
||||
|
||||
Ok(())
|
||||
@@ -4021,10 +4029,16 @@ impl Service {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// If the timeline is active on all shards, returns the [`TimelineInfo`]
|
||||
/// collected from shard 0.
|
||||
///
|
||||
/// An error is returned if the shard layout has changed during the import.
|
||||
/// This is guarded against within the storage controller and the pageserver,
|
||||
/// and, therefore, unexpected.
|
||||
async fn timeline_active_on_all_shards(
|
||||
self: &Arc<Self>,
|
||||
import: &TimelineImport,
|
||||
) -> anyhow::Result<bool> {
|
||||
) -> anyhow::Result<Option<TimelineInfo>> {
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -4048,13 +4062,17 @@ impl Service {
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
} else {
|
||||
return Ok(false);
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
targets
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
anyhow::bail!("No shards found to finalize import for");
|
||||
}
|
||||
|
||||
let results = self
|
||||
.tenant_for_shards_api(
|
||||
targets,
|
||||
@@ -4070,10 +4088,17 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(results.into_iter().all(|res| match res {
|
||||
let all_active = results.iter().all(|res| match res {
|
||||
Ok(info) => info.state == TimelineState::Active,
|
||||
Err(_) => false,
|
||||
}))
|
||||
});
|
||||
|
||||
if all_active {
|
||||
// Both unwraps are validated above
|
||||
Ok(Some(results.into_iter().next().unwrap().unwrap()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
|
||||
@@ -323,6 +323,42 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: TimelineInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
const BACKOFF: Duration = Duration::from_secs(5);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
}
|
||||
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(_) => {
|
||||
tracing::info!("Timeline created on safekeepers");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to create timeline on safekeepers: {err}");
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
},
|
||||
_ = tokio::time::sleep(BACKOFF) => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Directly insert the timeline into the database without reconciling it with safekeepers.
|
||||
///
|
||||
/// Useful if the timeline already exists on the specified safekeepers,
|
||||
|
||||
@@ -557,7 +557,7 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
endpoint_id: str,
|
||||
safekeepers_generation: int | None = None,
|
||||
safekeepers: list[int] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
allow_multiple: bool = False,
|
||||
create_test_user: bool = False,
|
||||
@@ -572,8 +572,8 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
extra_env_vars = env or {}
|
||||
if basebackup_request_tries is not None:
|
||||
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
if remote_ext_base_url is not None:
|
||||
args.extend(["--remote-ext-base-url", remote_ext_base_url])
|
||||
|
||||
if safekeepers_generation is not None:
|
||||
args.extend(["--safekeepers-generation", str(safekeepers_generation)])
|
||||
|
||||
@@ -4195,7 +4195,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
def start(
|
||||
self,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
safekeeper_generation: int | None = None,
|
||||
safekeepers: list[int] | None = None,
|
||||
@@ -4221,7 +4221,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.endpoint_id,
|
||||
safekeepers_generation=safekeeper_generation,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
create_test_user=create_test_user,
|
||||
@@ -4436,7 +4436,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
hot_standby: bool = False,
|
||||
lsn: Lsn | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
allow_multiple: bool = False,
|
||||
basebackup_request_tries: int | None = None,
|
||||
@@ -4455,7 +4455,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
).start(
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
@@ -4539,7 +4539,7 @@ class EndpointFactory:
|
||||
lsn: Lsn | None = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: list[str] | None = None,
|
||||
remote_ext_config: str | None = None,
|
||||
remote_ext_base_url: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
basebackup_request_tries: int | None = None,
|
||||
) -> Endpoint:
|
||||
@@ -4559,7 +4559,7 @@ class EndpointFactory:
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
remote_ext_base_url=remote_ext_base_url,
|
||||
pageserver_id=pageserver_id,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
@@ -4613,7 +4613,10 @@ class EndpointFactory:
|
||||
return self
|
||||
|
||||
def new_replica(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
|
||||
self,
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
@@ -4629,7 +4632,10 @@ class EndpointFactory:
|
||||
)
|
||||
|
||||
def new_replica_start(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
|
||||
self,
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
|
||||
@@ -221,7 +221,7 @@ def test_remote_extensions(
|
||||
|
||||
endpoint.create_remote_extension_spec(spec)
|
||||
|
||||
endpoint.start(remote_ext_config=extensions_endpoint)
|
||||
endpoint.start(remote_ext_base_url=extensions_endpoint)
|
||||
|
||||
with endpoint.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -249,7 +249,7 @@ def test_remote_extensions(
|
||||
# Remove the extension files to force a redownload of the extension.
|
||||
extension.remove(test_output_dir, pg_version)
|
||||
|
||||
endpoint.start(remote_ext_config=extensions_endpoint)
|
||||
endpoint.start(remote_ext_base_url=extensions_endpoint)
|
||||
|
||||
# Test that ALTER EXTENSION UPDATE statements also fetch remote extensions.
|
||||
with endpoint.connect() as conn:
|
||||
|
||||
@@ -24,6 +24,7 @@ from fixtures.utils import (
|
||||
skip_in_debug_build,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
@@ -97,6 +98,10 @@ def test_pgdata_import_smoke(
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
if neon_env_builder.storage_controller_config is None:
|
||||
neon_env_builder.storage_controller_config = {}
|
||||
neon_env_builder.storage_controller_config["timelines_onto_safekeepers"] = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# The test needs LocalFs support, which is only built in testing mode.
|
||||
@@ -286,34 +291,28 @@ def test_pgdata_import_smoke(
|
||||
#
|
||||
# validate that we can write
|
||||
#
|
||||
rw_endpoint = env.endpoints.create_start(
|
||||
branch_name=import_branch_name,
|
||||
endpoint_id="rw",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=ep_config,
|
||||
)
|
||||
rw_endpoint.safe_psql("create table othertable(values text)")
|
||||
rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
workload.validate()
|
||||
|
||||
# TODO: consider using `class Workload` here
|
||||
# to do compaction and whatnot?
|
||||
rw_lsn = Lsn(workload.endpoint().safe_psql_scalar("select pg_current_wal_flush_lsn()"))
|
||||
|
||||
#
|
||||
# validate that we can branch (important use case)
|
||||
#
|
||||
|
||||
# ... at the tip
|
||||
_ = env.create_branch(
|
||||
child_timeline_id = env.create_branch(
|
||||
new_branch_name="br-tip",
|
||||
ancestor_branch_name=import_branch_name,
|
||||
tenant_id=tenant_id,
|
||||
ancestor_start_lsn=rw_lsn,
|
||||
)
|
||||
br_tip_endpoint = env.endpoints.create_start(
|
||||
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id, config_lines=ep_config
|
||||
)
|
||||
validate_vanilla_equivalence(br_tip_endpoint)
|
||||
br_tip_endpoint.safe_psql("select * from othertable")
|
||||
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
|
||||
child_workload.validate()
|
||||
|
||||
validate_vanilla_equivalence(child_workload.endpoint())
|
||||
|
||||
# ... at the initdb lsn
|
||||
_ = env.create_branch(
|
||||
@@ -330,7 +329,7 @@ def test_pgdata_import_smoke(
|
||||
)
|
||||
validate_vanilla_equivalence(br_initdb_endpoint)
|
||||
with pytest.raises(psycopg2.errors.UndefinedTable):
|
||||
br_initdb_endpoint.safe_psql("select * from othertable")
|
||||
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
|
||||
@@ -39,3 +39,10 @@ def test_role_grants(neon_simple_env: NeonEnv):
|
||||
res = cur.fetchall()
|
||||
|
||||
assert res == [(1,)], "select should not succeed"
|
||||
|
||||
# confirm that replicas can also ensure the grants are correctly set.
|
||||
replica = env.endpoints.new_replica_start(endpoint)
|
||||
replica_client = replica.http_client()
|
||||
replica_client.set_role_grants(
|
||||
"test_role_grants", "test_role", "test_schema", ["CREATE", "USAGE"]
|
||||
)
|
||||
|
||||
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 05ddf212e2...d72d76f2cd
2
vendor/revisions.json
vendored
2
vendor/revisions.json
vendored
@@ -5,7 +5,7 @@
|
||||
],
|
||||
"v16": [
|
||||
"16.8",
|
||||
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
|
||||
"d72d76f2cdee4194dd052ce099e9784aca7c794a"
|
||||
],
|
||||
"v15": [
|
||||
"15.12",
|
||||
|
||||
Reference in New Issue
Block a user