mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 10:40:37 +00:00
Compare commits
5 Commits
mx_offset_
...
extension_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d402f39e6 | ||
|
|
7e4b55a933 | ||
|
|
681ed9261e | ||
|
|
3ce678b3bb | ||
|
|
33f1bacfb7 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -892,12 +892,14 @@ dependencies = [
|
|||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"postgres",
|
"postgres",
|
||||||
"regex",
|
"regex",
|
||||||
|
"remote_storage",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tar",
|
"tar",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
|
"toml_edit",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-opentelemetry",
|
"tracing-opentelemetry",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
@@ -965,6 +967,7 @@ dependencies = [
|
|||||||
"tar",
|
"tar",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"toml",
|
"toml",
|
||||||
|
"tracing",
|
||||||
"url",
|
"url",
|
||||||
"utils",
|
"utils",
|
||||||
"workspace_hack",
|
"workspace_hack",
|
||||||
|
|||||||
@@ -30,3 +30,5 @@ url.workspace = true
|
|||||||
compute_api.workspace = true
|
compute_api.workspace = true
|
||||||
utils.workspace = true
|
utils.workspace = true
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
toml_edit.workspace = true
|
||||||
|
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||||
|
|||||||
@@ -5,6 +5,8 @@
|
|||||||
//! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
|
//! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
|
||||||
//! - Every start is a fresh start, so the data directory is removed and
|
//! - Every start is a fresh start, so the data directory is removed and
|
||||||
//! initialized again on each run.
|
//! initialized again on each run.
|
||||||
|
//! - If remote_extension_config is provided, it will be used to fetch extensions list
|
||||||
|
//! and download `shared_preload_libraries` from the remote storage.
|
||||||
//! - Next it will put configuration files into the `PGDATA` directory.
|
//! - Next it will put configuration files into the `PGDATA` directory.
|
||||||
//! - Sync safekeepers and get commit LSN.
|
//! - Sync safekeepers and get commit LSN.
|
||||||
//! - Get `basebackup` from pageserver using the returned on the previous step LSN.
|
//! - Get `basebackup` from pageserver using the returned on the previous step LSN.
|
||||||
@@ -27,7 +29,8 @@
|
|||||||
//! compute_ctl -D /var/db/postgres/compute \
|
//! compute_ctl -D /var/db/postgres/compute \
|
||||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||||
//! -S /var/db/postgres/specs/current.json \
|
//! -S /var/db/postgres/specs/current.json \
|
||||||
//! -b /usr/local/bin/postgres
|
//! -b /usr/local/bin/postgres \
|
||||||
|
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"} \
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -35,7 +38,7 @@ use std::fs::File;
|
|||||||
use std::panic;
|
use std::panic;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::{mpsc, Arc, Condvar, Mutex};
|
use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock};
|
||||||
use std::{thread, time::Duration};
|
use std::{thread, time::Duration};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
@@ -48,6 +51,8 @@ use compute_api::responses::ComputeStatus;
|
|||||||
|
|
||||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||||
use compute_tools::configurator::launch_configurator;
|
use compute_tools::configurator::launch_configurator;
|
||||||
|
use compute_tools::extension_server::launch_download_extensions;
|
||||||
|
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
|
||||||
use compute_tools::http::api::launch_http_server;
|
use compute_tools::http::api::launch_http_server;
|
||||||
use compute_tools::logger::*;
|
use compute_tools::logger::*;
|
||||||
use compute_tools::monitor::launch_monitor;
|
use compute_tools::monitor::launch_monitor;
|
||||||
@@ -64,6 +69,14 @@ fn main() -> Result<()> {
|
|||||||
info!("build_tag: {build_tag}");
|
info!("build_tag: {build_tag}");
|
||||||
|
|
||||||
let matches = cli().get_matches();
|
let matches = cli().get_matches();
|
||||||
|
let pgbin_default = String::from("postgres");
|
||||||
|
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||||
|
|
||||||
|
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||||
|
let ext_remote_storage = remote_ext_config.map(|x| {
|
||||||
|
init_remote_storage(x, build_tag)
|
||||||
|
.expect("cannot initialize remote extension storage from config")
|
||||||
|
});
|
||||||
|
|
||||||
let http_port = *matches
|
let http_port = *matches
|
||||||
.get_one::<u16>("http-port")
|
.get_one::<u16>("http-port")
|
||||||
@@ -128,9 +141,6 @@ fn main() -> Result<()> {
|
|||||||
let compute_id = matches.get_one::<String>("compute-id");
|
let compute_id = matches.get_one::<String>("compute-id");
|
||||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||||
|
|
||||||
// Try to use just 'postgres' if no path is provided
|
|
||||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
|
||||||
|
|
||||||
let spec;
|
let spec;
|
||||||
let mut live_config_allowed = false;
|
let mut live_config_allowed = false;
|
||||||
match spec_json {
|
match spec_json {
|
||||||
@@ -168,6 +178,7 @@ fn main() -> Result<()> {
|
|||||||
|
|
||||||
let mut new_state = ComputeState::new();
|
let mut new_state = ComputeState::new();
|
||||||
let spec_set;
|
let spec_set;
|
||||||
|
|
||||||
if let Some(spec) = spec {
|
if let Some(spec) = spec {
|
||||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||||
new_state.pspec = Some(pspec);
|
new_state.pspec = Some(pspec);
|
||||||
@@ -179,9 +190,13 @@ fn main() -> Result<()> {
|
|||||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||||
pgdata: pgdata.to_string(),
|
pgdata: pgdata.to_string(),
|
||||||
pgbin: pgbin.to_string(),
|
pgbin: pgbin.to_string(),
|
||||||
|
pgversion: get_pg_version(pgbin),
|
||||||
live_config_allowed,
|
live_config_allowed,
|
||||||
state: Mutex::new(new_state),
|
state: Mutex::new(new_state),
|
||||||
state_changed: Condvar::new(),
|
state_changed: Condvar::new(),
|
||||||
|
ext_remote_storage,
|
||||||
|
available_libraries: OnceLock::new(),
|
||||||
|
available_extensions: OnceLock::new(),
|
||||||
};
|
};
|
||||||
let compute = Arc::new(compute_node);
|
let compute = Arc::new(compute_node);
|
||||||
|
|
||||||
@@ -190,6 +205,8 @@ fn main() -> Result<()> {
|
|||||||
let _http_handle =
|
let _http_handle =
|
||||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||||
|
|
||||||
|
let extension_server_port: u16 = http_port;
|
||||||
|
|
||||||
if !spec_set {
|
if !spec_set {
|
||||||
// No spec provided, hang waiting for it.
|
// No spec provided, hang waiting for it.
|
||||||
info!("no compute spec provided, waiting");
|
info!("no compute spec provided, waiting");
|
||||||
@@ -227,10 +244,13 @@ fn main() -> Result<()> {
|
|||||||
let _configurator_handle =
|
let _configurator_handle =
|
||||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||||
|
|
||||||
|
let _download_extensions_handle =
|
||||||
|
launch_download_extensions(&compute).expect("cannot launch download extensions thread");
|
||||||
|
|
||||||
// Start Postgres
|
// Start Postgres
|
||||||
let mut delay_exit = false;
|
let mut delay_exit = false;
|
||||||
let mut exit_code = None;
|
let mut exit_code = None;
|
||||||
let pg = match compute.start_compute() {
|
let pg = match compute.start_compute(extension_server_port) {
|
||||||
Ok(pg) => Some(pg),
|
Ok(pg) => Some(pg),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("could not start the compute node: {:?}", err);
|
error!("could not start the compute node: {:?}", err);
|
||||||
@@ -359,6 +379,12 @@ fn cli() -> clap::Command {
|
|||||||
.long("control-plane-uri")
|
.long("control-plane-uri")
|
||||||
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("remote-ext-config")
|
||||||
|
.short('r')
|
||||||
|
.long("remote-ext-config")
|
||||||
|
.value_name("REMOTE_EXT_CONFIG"),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -1,13 +1,15 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process::{Command, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::{Condvar, Mutex};
|
use std::sync::{Condvar, Mutex, OnceLock};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use postgres::{Client, NoTls};
|
use postgres::{Client, NoTls};
|
||||||
|
use tokio;
|
||||||
use tokio_postgres;
|
use tokio_postgres;
|
||||||
use tracing::{info, instrument, warn};
|
use tracing::{info, instrument, warn};
|
||||||
use utils::id::{TenantId, TimelineId};
|
use utils::id::{TenantId, TimelineId};
|
||||||
@@ -16,9 +18,12 @@ use utils::lsn::Lsn;
|
|||||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||||
|
|
||||||
use crate::config;
|
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||||
|
|
||||||
|
use crate::extension_server::PathAndFlag;
|
||||||
use crate::pg_helpers::*;
|
use crate::pg_helpers::*;
|
||||||
use crate::spec::*;
|
use crate::spec::*;
|
||||||
|
use crate::{config, extension_server};
|
||||||
|
|
||||||
/// Compute node info shared across several `compute_ctl` threads.
|
/// Compute node info shared across several `compute_ctl` threads.
|
||||||
pub struct ComputeNode {
|
pub struct ComputeNode {
|
||||||
@@ -26,6 +31,7 @@ pub struct ComputeNode {
|
|||||||
pub connstr: url::Url,
|
pub connstr: url::Url,
|
||||||
pub pgdata: String,
|
pub pgdata: String,
|
||||||
pub pgbin: String,
|
pub pgbin: String,
|
||||||
|
pub pgversion: String,
|
||||||
/// We should only allow live re- / configuration of the compute node if
|
/// We should only allow live re- / configuration of the compute node if
|
||||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||||
/// the latest configuration. Otherwise, there could be a case:
|
/// the latest configuration. Otherwise, there could be a case:
|
||||||
@@ -45,6 +51,11 @@ pub struct ComputeNode {
|
|||||||
pub state: Mutex<ComputeState>,
|
pub state: Mutex<ComputeState>,
|
||||||
/// `Condvar` to allow notifying waiters about state changes.
|
/// `Condvar` to allow notifying waiters about state changes.
|
||||||
pub state_changed: Condvar,
|
pub state_changed: Condvar,
|
||||||
|
/// the S3 bucket that we search for extensions in
|
||||||
|
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||||
|
// cached lists of available extensions and libraries
|
||||||
|
pub available_libraries: OnceLock<HashMap<String, Vec<RemotePath>>>,
|
||||||
|
pub available_extensions: OnceLock<HashMap<String, Vec<PathAndFlag>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@@ -323,14 +334,22 @@ impl ComputeNode {
|
|||||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||||
/// safekeepers sync, basebackup, etc.
|
/// safekeepers sync, basebackup, etc.
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
pub fn prepare_pgdata(
|
||||||
|
&self,
|
||||||
|
compute_state: &ComputeState,
|
||||||
|
extension_server_port: u16,
|
||||||
|
) -> Result<()> {
|
||||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
let spec = &pspec.spec;
|
let spec = &pspec.spec;
|
||||||
let pgdata_path = Path::new(&self.pgdata);
|
let pgdata_path = Path::new(&self.pgdata);
|
||||||
|
|
||||||
// Remove/create an empty pgdata directory and put configuration there.
|
// Remove/create an empty pgdata directory and put configuration there.
|
||||||
self.create_pgdata()?;
|
self.create_pgdata()?;
|
||||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
config::write_postgres_conf(
|
||||||
|
&pgdata_path.join("postgresql.conf"),
|
||||||
|
&pspec.spec,
|
||||||
|
Some(extension_server_port),
|
||||||
|
)?;
|
||||||
|
|
||||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||||
// is already connected it will be kicked out, so a secondary (standby)
|
// is already connected it will be kicked out, so a secondary (standby)
|
||||||
@@ -472,7 +491,7 @@ impl ComputeNode {
|
|||||||
|
|
||||||
// Write new config
|
// Write new config
|
||||||
let pgdata_path = Path::new(&self.pgdata);
|
let pgdata_path = Path::new(&self.pgdata);
|
||||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
|
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
|
||||||
|
|
||||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||||
self.pg_reload_conf(&mut client)?;
|
self.pg_reload_conf(&mut client)?;
|
||||||
@@ -502,7 +521,7 @@ impl ComputeNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
|
||||||
let compute_state = self.state.lock().unwrap().clone();
|
let compute_state = self.state.lock().unwrap().clone();
|
||||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
info!(
|
info!(
|
||||||
@@ -513,7 +532,27 @@ impl ComputeNode {
|
|||||||
pspec.timeline_id,
|
pspec.timeline_id,
|
||||||
);
|
);
|
||||||
|
|
||||||
self.prepare_pgdata(&compute_state)?;
|
// This part is sync, because we need to download
|
||||||
|
// remote shared_preload_libraries before postgres start (if any)
|
||||||
|
let library_load_start_time = Utc::now();
|
||||||
|
{
|
||||||
|
self.prepare_extenal_libraries(&compute_state)?;
|
||||||
|
|
||||||
|
let library_load_time = Utc::now()
|
||||||
|
.signed_duration_since(library_load_start_time)
|
||||||
|
.to_std()
|
||||||
|
.unwrap()
|
||||||
|
.as_millis() as u64;
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
state.metrics.load_libraries_ms = library_load_time;
|
||||||
|
info!(
|
||||||
|
"Loading shared_preload_libraries took {:?}ms",
|
||||||
|
library_load_time
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||||
|
|
||||||
let start_time = Utc::now();
|
let start_time = Utc::now();
|
||||||
|
|
||||||
@@ -649,4 +688,150 @@ LIMIT 100",
|
|||||||
"{{\"pg_stat_statements\": []}}".to_string()
|
"{{\"pg_stat_statements\": []}}".to_string()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If remote extension storage is configured,
|
||||||
|
// download shared preload libraries.
|
||||||
|
#[tokio::main]
|
||||||
|
pub async fn prepare_extenal_libraries(&self, compute_state: &ComputeState) -> Result<()> {
|
||||||
|
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
|
||||||
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
|
// download preload shared libraries before postgres start (if any)
|
||||||
|
let spec = &pspec.spec;
|
||||||
|
|
||||||
|
// 1. parse custom extension paths from spec
|
||||||
|
let custom_ext_prefixes = match &spec.custom_extensions {
|
||||||
|
Some(custom_extensions) => custom_extensions.clone(),
|
||||||
|
None => Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
|
||||||
|
|
||||||
|
// parse shared_preload_libraries from spec
|
||||||
|
let mut libs_vec = Vec::new();
|
||||||
|
|
||||||
|
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||||
|
libs_vec = libs
|
||||||
|
.split(&[',', '\'', ' '])
|
||||||
|
.filter(|s| *s != "neon" && !s.is_empty())
|
||||||
|
.map(str::to_string)
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"shared_preload_libraries parsed from spec.cluster.settings: {:?}",
|
||||||
|
libs_vec
|
||||||
|
);
|
||||||
|
|
||||||
|
// also parse shared_preload_libraries from provided postgresql.conf
|
||||||
|
// that is used in neon_local and python tests
|
||||||
|
if let Some(conf) = &spec.cluster.postgresql_conf {
|
||||||
|
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
|
||||||
|
|
||||||
|
let mut shared_preload_libraries_line = "";
|
||||||
|
for line in conf_lines {
|
||||||
|
if line.starts_with("shared_preload_libraries") {
|
||||||
|
shared_preload_libraries_line = line;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut preload_libs_vec = Vec::new();
|
||||||
|
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
|
||||||
|
preload_libs_vec = libs
|
||||||
|
.split(&[',', '\'', ' '])
|
||||||
|
.filter(|s| *s != "neon" && !s.is_empty())
|
||||||
|
.map(str::to_string)
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"shared_preload_libraries parsed from spec.cluster.postgresql_conf: {:?}",
|
||||||
|
preload_libs_vec
|
||||||
|
);
|
||||||
|
|
||||||
|
libs_vec.extend(preload_libs_vec);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Libraries to download: {:?}", &libs_vec);
|
||||||
|
// download shared_preload_libraries
|
||||||
|
let available_libraries = extension_server::get_available_libraries(
|
||||||
|
ext_remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
&self.pgversion,
|
||||||
|
&custom_ext_prefixes,
|
||||||
|
&libs_vec,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.available_libraries
|
||||||
|
.set(available_libraries)
|
||||||
|
.expect("available_libraries.set error");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// If remote extension storage is configured,
|
||||||
|
// download extension control files
|
||||||
|
#[tokio::main]
|
||||||
|
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
|
||||||
|
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
|
||||||
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
|
let spec = &pspec.spec;
|
||||||
|
|
||||||
|
// 1. parse custom extension paths from spec
|
||||||
|
let custom_ext_prefixes = match &spec.custom_extensions {
|
||||||
|
Some(custom_extensions) => custom_extensions.clone(),
|
||||||
|
None => Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
|
||||||
|
|
||||||
|
// download extension control files
|
||||||
|
let available_extensions = extension_server::get_available_extensions(
|
||||||
|
ext_remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
&self.pgversion,
|
||||||
|
&custom_ext_prefixes,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.available_extensions
|
||||||
|
.set(available_extensions)
|
||||||
|
.expect("available_extensions.set error");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn download_extension_files(&self, filename: String) -> Result<()> {
|
||||||
|
match &self.ext_remote_storage {
|
||||||
|
None => anyhow::bail!("No remote extension storage"),
|
||||||
|
Some(remote_storage) => {
|
||||||
|
extension_server::download_extension_files(
|
||||||
|
&filename,
|
||||||
|
remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
self.available_extensions
|
||||||
|
.get()
|
||||||
|
.context("available_extensions broke")?,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn download_library_file(&self, filename: String) -> Result<()> {
|
||||||
|
match &self.ext_remote_storage {
|
||||||
|
None => anyhow::bail!("No remote extension storage"),
|
||||||
|
Some(remote_storage) => {
|
||||||
|
extension_server::download_library_file(
|
||||||
|
&filename,
|
||||||
|
remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
self.available_libraries
|
||||||
|
.get()
|
||||||
|
.context("available_libraries broke")?,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create or completely rewrite configuration file specified by `path`
|
/// Create or completely rewrite configuration file specified by `path`
|
||||||
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
pub fn write_postgres_conf(
|
||||||
|
path: &Path,
|
||||||
|
spec: &ComputeSpec,
|
||||||
|
extension_server_port: Option<u16>,
|
||||||
|
) -> Result<()> {
|
||||||
// File::create() destroys the file content if it exists.
|
// File::create() destroys the file content if it exists.
|
||||||
let mut file = File::create(path)?;
|
let mut file = File::create(path)?;
|
||||||
|
|
||||||
@@ -95,5 +99,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
|||||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(port) = extension_server_port {
|
||||||
|
writeln!(file, "neon.extension_server_port={}", port)?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,13 +42,15 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
pub fn launch_configurator(
|
||||||
|
compute: &Arc<ComputeNode>,
|
||||||
|
) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||||
let compute = Arc::clone(compute);
|
let compute = Arc::clone(compute);
|
||||||
|
|
||||||
Ok(thread::Builder::new()
|
thread::Builder::new()
|
||||||
.name("compute-configurator".into())
|
.name("compute-configurator".into())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
configurator_main_loop(&compute);
|
configurator_main_loop(&compute);
|
||||||
info!("configurator thread is exited");
|
info!("configurator thread is exited");
|
||||||
})?)
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
447
compute_tools/src/extension_server.rs
Normal file
447
compute_tools/src/extension_server.rs
Normal file
@@ -0,0 +1,447 @@
|
|||||||
|
// Download extension files from the extension store
|
||||||
|
// and put them in the right place in the postgres directory
|
||||||
|
use crate::compute::ComputeNode;
|
||||||
|
use anyhow::{self, bail, Context, Result};
|
||||||
|
use futures::future::join_all;
|
||||||
|
use remote_storage::*;
|
||||||
|
use serde_json::{self, Value};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::{BufWriter, Write};
|
||||||
|
use std::num::{NonZeroU32, NonZeroUsize};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::str;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
// remote!
|
||||||
|
const SHARE_EXT_PATH: &str = "share/extension";
|
||||||
|
|
||||||
|
fn pass_any_error(results: Vec<Result<()>>) -> Result<()> {
|
||||||
|
for result in results {
|
||||||
|
result?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||||
|
// gives the result of `pg_config [argument]`
|
||||||
|
// where argument is a flag like `--version` or `--sharedir`
|
||||||
|
let pgconfig = pgbin.replace("postgres", "pg_config");
|
||||||
|
let config_output = std::process::Command::new(pgconfig)
|
||||||
|
.arg(argument)
|
||||||
|
.output()
|
||||||
|
.expect("pg_config error");
|
||||||
|
std::str::from_utf8(&config_output.stdout)
|
||||||
|
.expect("pg_config error")
|
||||||
|
.trim()
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_pg_version(pgbin: &str) -> String {
|
||||||
|
// pg_config --version returns a (platform specific) human readable string
|
||||||
|
// such as "PostgreSQL 15.4". We parse this to v14/v15
|
||||||
|
let human_version = get_pg_config("--version", pgbin);
|
||||||
|
if human_version.contains("15") {
|
||||||
|
return "v15".to_string();
|
||||||
|
} else if human_version.contains("14") {
|
||||||
|
return "v14".to_string();
|
||||||
|
}
|
||||||
|
panic!("Unsuported postgres version {human_version}");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn download_helper(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
remote_from_path: RemotePath,
|
||||||
|
sub_directory: Option<&str>,
|
||||||
|
download_location: &Path,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// downloads file at remote_from_path to
|
||||||
|
// `download_location/[optional: subdirectory]/[remote_storage.object_name()]`
|
||||||
|
// Note: the subdirectory commmand is needed when there is an extension that
|
||||||
|
// depends on files in a subdirectory.
|
||||||
|
// For example, v14/share/extension/some_ext.control
|
||||||
|
// might depend on v14/share/extension/some_ext/some_ext--1.1.0.sql
|
||||||
|
// and v14/share/extension/some_ext/xxx.csv
|
||||||
|
// Note: it is the caller's responsibility to create the appropriate subdirectory
|
||||||
|
|
||||||
|
let local_path = match sub_directory {
|
||||||
|
Some(subdir) => download_location
|
||||||
|
.join(subdir)
|
||||||
|
.join(remote_from_path.object_name().expect("bad object")),
|
||||||
|
None => download_location.join(remote_from_path.object_name().expect("bad object")),
|
||||||
|
};
|
||||||
|
if local_path.exists() {
|
||||||
|
info!("File {:?} already exists. Skipping download", &local_path);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"Downloading {:?} to location {:?}",
|
||||||
|
&remote_from_path, &local_path
|
||||||
|
);
|
||||||
|
let mut download = remote_storage.download(&remote_from_path).await?;
|
||||||
|
let mut write_data_buffer = Vec::new();
|
||||||
|
download
|
||||||
|
.download_stream
|
||||||
|
.read_to_end(&mut write_data_buffer)
|
||||||
|
.await?;
|
||||||
|
let mut output_file = BufWriter::new(File::create(local_path)?);
|
||||||
|
output_file.write_all(&write_data_buffer)?;
|
||||||
|
info!("Download {:?} completed successfully", &remote_from_path);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// download extension control files
|
||||||
|
//
|
||||||
|
// if custom_ext_prefixes is provided - search also in custom extension paths
|
||||||
|
//
|
||||||
|
pub async fn get_available_extensions(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
pg_version: &str,
|
||||||
|
custom_ext_prefixes: &Vec<String>,
|
||||||
|
) -> anyhow::Result<HashMap<String, Vec<PathAndFlag>>> {
|
||||||
|
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||||
|
|
||||||
|
// public path, plus any private paths to download extensions from
|
||||||
|
let mut paths: Vec<RemotePath> = Vec::new();
|
||||||
|
paths.push(RemotePath::new(
|
||||||
|
&Path::new(pg_version).join(SHARE_EXT_PATH),
|
||||||
|
)?);
|
||||||
|
for custom_prefix in custom_ext_prefixes {
|
||||||
|
paths.push(RemotePath::new(
|
||||||
|
&Path::new(pg_version)
|
||||||
|
.join(custom_prefix)
|
||||||
|
.join(SHARE_EXT_PATH),
|
||||||
|
)?);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (extension_files, control_files) =
|
||||||
|
organized_extension_files(remote_storage, &paths).await?;
|
||||||
|
|
||||||
|
let mut control_file_download_tasks = Vec::new();
|
||||||
|
// download all control files
|
||||||
|
for control_file in control_files {
|
||||||
|
control_file_download_tasks.push(download_helper(
|
||||||
|
remote_storage,
|
||||||
|
control_file.clone(),
|
||||||
|
None,
|
||||||
|
&local_sharedir,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
pass_any_error(join_all(control_file_download_tasks).await)?;
|
||||||
|
Ok(extension_files)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download requested shared_preload_libraries
|
||||||
|
//
|
||||||
|
// Note that tenant_id is not optional here, because we only download libraries
|
||||||
|
// after we know the tenant spec and the tenant_id.
|
||||||
|
//
|
||||||
|
// return list of all library files to use it in the future searches
|
||||||
|
pub async fn get_available_libraries(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
pg_version: &str,
|
||||||
|
custom_ext_prefixes: &Vec<String>,
|
||||||
|
preload_libraries: &Vec<String>,
|
||||||
|
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
|
||||||
|
// Construct a hashmap of all available libraries
|
||||||
|
// example (key, value) pair: test_lib0: [RemotePath(v14/lib/test_lib0.so), RemotePath(v14/lib/test_lib0.so.3)]
|
||||||
|
let mut paths: Vec<RemotePath> = Vec::new();
|
||||||
|
// public libraries
|
||||||
|
paths.push(
|
||||||
|
RemotePath::new(&Path::new(&pg_version).join("lib/"))
|
||||||
|
.expect("The hard coded path here is valid"),
|
||||||
|
);
|
||||||
|
// custom libraries
|
||||||
|
for custom_prefix in custom_ext_prefixes {
|
||||||
|
paths.push(
|
||||||
|
RemotePath::new(&Path::new(&pg_version).join(custom_prefix).join("lib"))
|
||||||
|
.expect("The hard coded path here is valid"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let all_available_libraries = organized_library_files(remote_storage, &paths).await?;
|
||||||
|
|
||||||
|
info!("list of library files {:?}", &all_available_libraries);
|
||||||
|
// download all requested libraries
|
||||||
|
let mut download_tasks = Vec::new();
|
||||||
|
for lib_name in preload_libraries {
|
||||||
|
download_tasks.push(download_library_file(
|
||||||
|
lib_name,
|
||||||
|
remote_storage,
|
||||||
|
pgbin,
|
||||||
|
&all_available_libraries,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
pass_any_error(join_all(download_tasks).await)?;
|
||||||
|
Ok(all_available_libraries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// download all sqlfiles (and possibly data files) for a given extension name
|
||||||
|
//
|
||||||
|
pub async fn download_extension_files(
|
||||||
|
ext_name: &str,
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
all_available_files: &HashMap<String, Vec<PathAndFlag>>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||||
|
let mut downloaded_something = false;
|
||||||
|
let mut made_subdir = false;
|
||||||
|
|
||||||
|
info!("EXTENSION {:?}", ext_name);
|
||||||
|
info!("{:?}", all_available_files.get(ext_name));
|
||||||
|
|
||||||
|
info!("start download");
|
||||||
|
let mut download_tasks = Vec::new();
|
||||||
|
if let Some(files) = all_available_files.get(ext_name) {
|
||||||
|
info!("Downloading files for extension {:?}", &ext_name);
|
||||||
|
for path_and_flag in files {
|
||||||
|
let file = &path_and_flag.path;
|
||||||
|
let subdir_flag = path_and_flag.subdir_flag;
|
||||||
|
info!(
|
||||||
|
"--- Downloading {:?} (for {:?} as subdir? = {:?})",
|
||||||
|
&file, &ext_name, subdir_flag
|
||||||
|
);
|
||||||
|
let mut subdir = None;
|
||||||
|
if subdir_flag {
|
||||||
|
subdir = Some(ext_name);
|
||||||
|
if !made_subdir {
|
||||||
|
made_subdir = true;
|
||||||
|
std::fs::create_dir_all(local_sharedir.join(ext_name))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
download_tasks.push(download_helper(
|
||||||
|
remote_storage,
|
||||||
|
file.clone(),
|
||||||
|
subdir,
|
||||||
|
&local_sharedir,
|
||||||
|
));
|
||||||
|
downloaded_something = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !downloaded_something {
|
||||||
|
bail!("Files for extension {ext_name} are not found in the extension store");
|
||||||
|
}
|
||||||
|
pass_any_error(join_all(download_tasks).await)?;
|
||||||
|
info!("finish download");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// appends an .so suffix to libname if it does not already have one
|
||||||
|
fn enforce_so_end(libname: &str) -> String {
|
||||||
|
if !libname.contains(".so") {
|
||||||
|
format!("{}.so", libname)
|
||||||
|
} else {
|
||||||
|
libname.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// download shared library file
|
||||||
|
pub async fn download_library_file(
|
||||||
|
lib_name: &str,
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
all_available_libraries: &HashMap<String, Vec<RemotePath>>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let lib_name = get_library_name(lib_name);
|
||||||
|
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
|
||||||
|
info!("looking for library {:?}", &lib_name);
|
||||||
|
match all_available_libraries.get(&*lib_name) {
|
||||||
|
Some(remote_paths) => {
|
||||||
|
let mut library_download_tasks = Vec::new();
|
||||||
|
for remote_path in remote_paths {
|
||||||
|
let file_path = local_libdir.join(remote_path.object_name().expect("bad object"));
|
||||||
|
if file_path.exists() {
|
||||||
|
info!("File {:?} already exists. Skipping download", &file_path);
|
||||||
|
} else {
|
||||||
|
library_download_tasks.push(download_helper(
|
||||||
|
remote_storage,
|
||||||
|
remote_path.clone(),
|
||||||
|
None,
|
||||||
|
&local_libdir,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pass_any_error(join_all(library_download_tasks).await)?;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// minor TODO: this logic seems to be somewhat faulty for .so.3 type files?
|
||||||
|
let lib_name_with_ext = enforce_so_end(&lib_name);
|
||||||
|
let file_path = local_libdir.join(lib_name_with_ext);
|
||||||
|
if file_path.exists() {
|
||||||
|
info!("File {:?} already exists. Skipping download", &file_path);
|
||||||
|
} else {
|
||||||
|
bail!("Library file {lib_name} not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function initializes the necessary structs to use remmote storage (should be fairly cheap)
|
||||||
|
pub fn init_remote_storage(
|
||||||
|
remote_ext_config: &str,
|
||||||
|
default_prefix: &str,
|
||||||
|
) -> anyhow::Result<GenericRemoteStorage> {
|
||||||
|
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
|
||||||
|
|
||||||
|
let remote_ext_bucket = match &remote_ext_config["bucket"] {
|
||||||
|
Value::String(x) => x,
|
||||||
|
_ => bail!("remote_ext_config missing bucket"),
|
||||||
|
};
|
||||||
|
let remote_ext_region = match &remote_ext_config["region"] {
|
||||||
|
Value::String(x) => x,
|
||||||
|
_ => bail!("remote_ext_config missing region"),
|
||||||
|
};
|
||||||
|
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
|
||||||
|
Value::String(x) => Some(x.clone()),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let remote_ext_prefix = match &remote_ext_config["prefix"] {
|
||||||
|
Value::String(x) => Some(x.clone()),
|
||||||
|
// if prefix is not provided, use default, which is the build_tag
|
||||||
|
_ => Some(default_prefix.to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// load will not be large, so default parameters are fine
|
||||||
|
let config = S3Config {
|
||||||
|
bucket_name: remote_ext_bucket.to_string(),
|
||||||
|
bucket_region: remote_ext_region.to_string(),
|
||||||
|
prefix_in_bucket: remote_ext_prefix,
|
||||||
|
endpoint: remote_ext_endpoint,
|
||||||
|
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||||
|
max_keys_per_list_response: None,
|
||||||
|
};
|
||||||
|
let config = RemoteStorageConfig {
|
||||||
|
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
|
||||||
|
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
|
||||||
|
storage: RemoteStorageKind::AwsS3(config),
|
||||||
|
};
|
||||||
|
GenericRemoteStorage::from_config(&config)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_library_name(path: &str) -> String {
|
||||||
|
let path_suffix: Vec<&str> = path.split('/').collect();
|
||||||
|
let path_suffix = path_suffix.last().expect("bad ext name").to_string();
|
||||||
|
if let Some(index) = path_suffix.find(".so") {
|
||||||
|
return path_suffix[..index].to_string();
|
||||||
|
}
|
||||||
|
path_suffix
|
||||||
|
}
|
||||||
|
|
||||||
|
// asyncrounously lists files in all necessary directories
|
||||||
|
// TODO: potential optimization: do a single list files on the entire bucket
|
||||||
|
// and then filter out the files we don't need
|
||||||
|
async fn list_all_files(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
paths: &Vec<RemotePath>,
|
||||||
|
) -> Result<Vec<RemotePath>> {
|
||||||
|
let mut list_tasks = Vec::new();
|
||||||
|
let mut all_files = Vec::new();
|
||||||
|
for path in paths {
|
||||||
|
list_tasks.push(remote_storage.list_files(Some(path)));
|
||||||
|
}
|
||||||
|
for list_result in join_all(list_tasks).await {
|
||||||
|
all_files.extend(list_result?);
|
||||||
|
}
|
||||||
|
Ok(all_files)
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper to collect all libraries, grouped by library name
|
||||||
|
// Returns a hashmap of (library name: [paths]})
|
||||||
|
// example entry: {libpgtypes: [libpgtypes.so.3, libpgtypes.so]}
|
||||||
|
async fn organized_library_files(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
paths: &Vec<RemotePath>,
|
||||||
|
) -> Result<HashMap<String, Vec<RemotePath>>> {
|
||||||
|
let mut library_groups = HashMap::new();
|
||||||
|
for file in list_all_files(remote_storage, paths).await? {
|
||||||
|
let lib_name = get_library_name(file.get_path().to_str().context("invalid path")?);
|
||||||
|
let lib_list = library_groups.entry(lib_name).or_insert(Vec::new());
|
||||||
|
lib_list.push(file.to_owned());
|
||||||
|
}
|
||||||
|
Ok(library_groups)
|
||||||
|
}
|
||||||
|
|
||||||
|
// store a path, paired with a flag indicating whether the path is to a file in
|
||||||
|
// the root or subdirectory
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PathAndFlag {
|
||||||
|
path: RemotePath,
|
||||||
|
subdir_flag: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
// get_ext_name extracts the extension name, and returns a flag indicating
|
||||||
|
// whether this file is in a subdirectory or not.
|
||||||
|
//
|
||||||
|
// extension files can be in subdirectories of the extension store.
|
||||||
|
// examples of layout:
|
||||||
|
// v14//share//extension/extension_name--1.0.sql,
|
||||||
|
// v14//share//extension/extension_name/extension_name--1.0.sql,
|
||||||
|
// v14//share//extension/extension_name/extra_data.csv
|
||||||
|
// Note: we *assume* that the extension files is in one of these formats.
|
||||||
|
// If it is not, this code's behavior is *undefined*.
|
||||||
|
fn get_ext_name(path: &str) -> Result<(&str, bool)> {
|
||||||
|
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
|
||||||
|
let ext_name = path_suffix.last().expect("bad ext name");
|
||||||
|
|
||||||
|
if let Some(index) = ext_name.find('/') {
|
||||||
|
return Ok((&ext_name[..index], true));
|
||||||
|
} else if let Some(index) = ext_name.find("--") {
|
||||||
|
return Ok((&ext_name[..index], false));
|
||||||
|
}
|
||||||
|
Ok((ext_name, false))
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper to collect files of given prefixes for extensions and group them by extension
|
||||||
|
// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension)
|
||||||
|
// and a list of control files
|
||||||
|
// For example, an entry in the hashmap could be
|
||||||
|
// {"anon": [RemotePath("v14/anon/share/extension/anon/address.csv"),
|
||||||
|
// RemotePath("v14/anon/share/extension/anon/anon--1.1.0.sql")]},
|
||||||
|
// with corresponding list of control files entry being
|
||||||
|
// {"anon.control": RemotePath("v14/anon/share/extension/anon.control")}
|
||||||
|
async fn organized_extension_files(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
paths: &Vec<RemotePath>,
|
||||||
|
) -> Result<(HashMap<String, Vec<PathAndFlag>>, Vec<RemotePath>)> {
|
||||||
|
let mut grouped_dependencies = HashMap::new();
|
||||||
|
let mut control_files = Vec::new();
|
||||||
|
|
||||||
|
for file in list_all_files(remote_storage, paths).await? {
|
||||||
|
if file.extension().context("bad file name")? == "control" {
|
||||||
|
control_files.push(file.to_owned());
|
||||||
|
} else {
|
||||||
|
let (file_ext_name, subdir_flag) =
|
||||||
|
get_ext_name(file.get_path().to_str().context("invalid path")?)?;
|
||||||
|
let ext_file_list = grouped_dependencies
|
||||||
|
.entry(file_ext_name.to_string())
|
||||||
|
.or_insert(Vec::new());
|
||||||
|
ext_file_list.push(PathAndFlag {
|
||||||
|
path: file.to_owned(),
|
||||||
|
subdir_flag,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok((grouped_dependencies, control_files))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn launch_download_extensions(
|
||||||
|
compute: &Arc<ComputeNode>,
|
||||||
|
) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||||
|
let compute = Arc::clone(compute);
|
||||||
|
thread::Builder::new()
|
||||||
|
.name("download-extensions".into())
|
||||||
|
.spawn(move || {
|
||||||
|
info!("start download_extension_files");
|
||||||
|
let compute_state = compute.state.lock().expect("error unlocking compute.state");
|
||||||
|
compute
|
||||||
|
.prepare_external_extensions(&compute_state)
|
||||||
|
.expect("error preparing extensions");
|
||||||
|
info!("download_extension_files done, exiting thread");
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -121,6 +121,55 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// download extension files from S3 on demand
|
||||||
|
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||||
|
info!("serving {:?} POST request", route);
|
||||||
|
info!("req.uri {:?}", req.uri());
|
||||||
|
|
||||||
|
let mut is_library = false;
|
||||||
|
|
||||||
|
if let Some(params) = req.uri().query() {
|
||||||
|
info!("serving {:?} POST request with params: {}", route, params);
|
||||||
|
|
||||||
|
if params == "is_library=true" {
|
||||||
|
is_library = true;
|
||||||
|
} else {
|
||||||
|
let mut resp = Response::new(Body::from("Wrong request parameters"));
|
||||||
|
*resp.status_mut() = StatusCode::BAD_REQUEST;
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let filename = route.split('/').last().unwrap().to_string();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"serving /extension_server POST request, filename: {:?} is_library: {}",
|
||||||
|
filename, is_library
|
||||||
|
);
|
||||||
|
|
||||||
|
if is_library {
|
||||||
|
match compute.download_library_file(filename.to_string()).await {
|
||||||
|
Ok(_) => Response::new(Body::from("OK")),
|
||||||
|
Err(e) => {
|
||||||
|
error!("library download failed: {}", e);
|
||||||
|
let mut resp = Response::new(Body::from(e.to_string()));
|
||||||
|
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||||
|
resp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match compute.download_extension_files(filename.to_string()).await {
|
||||||
|
Ok(_) => Response::new(Body::from("OK")),
|
||||||
|
Err(e) => {
|
||||||
|
error!("extension download failed: {}", e);
|
||||||
|
let mut resp = Response::new(Body::from(e.to_string()));
|
||||||
|
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||||
|
resp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return the `404 Not Found` for any other routes.
|
// Return the `404 Not Found` for any other routes.
|
||||||
_ => {
|
_ => {
|
||||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||||
|
|||||||
@@ -139,6 +139,34 @@ paths:
|
|||||||
application/json:
|
application/json:
|
||||||
schema:
|
schema:
|
||||||
$ref: "#/components/schemas/GenericError"
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
/extension_server:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- Extension
|
||||||
|
summary: Download extension from S3 to local folder.
|
||||||
|
description: ""
|
||||||
|
operationId: downloadExtension
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: Extension downloaded
|
||||||
|
content:
|
||||||
|
text/plain:
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
description: Error text or 'OK' if download succeeded.
|
||||||
|
example: "OK"
|
||||||
|
400:
|
||||||
|
description: Request is invalid.
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
500:
|
||||||
|
description: Extension download request failed.
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
|
||||||
components:
|
components:
|
||||||
securitySchemes:
|
securitySchemes:
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ pub mod http;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
pub mod logger;
|
pub mod logger;
|
||||||
pub mod compute;
|
pub mod compute;
|
||||||
|
pub mod extension_server;
|
||||||
pub mod monitor;
|
pub mod monitor;
|
||||||
pub mod params;
|
pub mod params;
|
||||||
pub mod pg_helpers;
|
pub mod pg_helpers;
|
||||||
|
|||||||
@@ -105,10 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
||||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||||
let state = Arc::clone(state);
|
let state = Arc::clone(state);
|
||||||
|
|
||||||
Ok(thread::Builder::new()
|
thread::Builder::new()
|
||||||
.name("compute-monitor".into())
|
.name("compute-monitor".into())
|
||||||
.spawn(move || watch_compute_activity(&state))?)
|
.spawn(move || watch_compute_activity(&state))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
|
|||||||
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
||||||
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
||||||
// always write all config into it creating new file.
|
// always write all config into it creating new file.
|
||||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
|
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
|
||||||
|
|
||||||
update_pg_hba(pgdata_path)?;
|
update_pg_hba(pgdata_path)?;
|
||||||
|
|
||||||
|
|||||||
@@ -32,3 +32,4 @@ utils.workspace = true
|
|||||||
|
|
||||||
compute_api.workspace = true
|
compute_api.workspace = true
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
|||||||
@@ -658,6 +658,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.get_one::<String>("endpoint_id")
|
.get_one::<String>("endpoint_id")
|
||||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||||
|
|
||||||
|
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
|
||||||
|
|
||||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||||
let safekeepers =
|
let safekeepers =
|
||||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||||
@@ -699,7 +701,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
println!("Starting existing endpoint {endpoint_id}...");
|
println!("Starting existing endpoint {endpoint_id}...");
|
||||||
endpoint.start(&auth_token, safekeepers)?;
|
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||||
} else {
|
} else {
|
||||||
let branch_name = sub_args
|
let branch_name = sub_args
|
||||||
.get_one::<String>("branch-name")
|
.get_one::<String>("branch-name")
|
||||||
@@ -743,7 +745,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
pg_version,
|
pg_version,
|
||||||
mode,
|
mode,
|
||||||
)?;
|
)?;
|
||||||
ep.start(&auth_token, safekeepers)?;
|
ep.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"stop" => {
|
"stop" => {
|
||||||
@@ -1003,6 +1005,12 @@ fn cli() -> Command {
|
|||||||
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
||||||
.required(false);
|
.required(false);
|
||||||
|
|
||||||
|
let remote_ext_config_args = Arg::new("remote-ext-config")
|
||||||
|
.long("remote-ext-config")
|
||||||
|
.num_args(1)
|
||||||
|
.help("Configure the S3 bucket that we search for extensions in.")
|
||||||
|
.required(false);
|
||||||
|
|
||||||
let lsn_arg = Arg::new("lsn")
|
let lsn_arg = Arg::new("lsn")
|
||||||
.long("lsn")
|
.long("lsn")
|
||||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||||
@@ -1161,6 +1169,7 @@ fn cli() -> Command {
|
|||||||
.arg(pg_version_arg)
|
.arg(pg_version_arg)
|
||||||
.arg(hot_standby_arg)
|
.arg(hot_standby_arg)
|
||||||
.arg(safekeepers_arg)
|
.arg(safekeepers_arg)
|
||||||
|
.arg(remote_ext_config_args)
|
||||||
)
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
Command::new("stop")
|
Command::new("stop")
|
||||||
|
|||||||
@@ -311,7 +311,7 @@ impl Endpoint {
|
|||||||
|
|
||||||
// TODO: use future host field from safekeeper spec
|
// TODO: use future host field from safekeeper spec
|
||||||
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
||||||
// whichever is availiable.
|
// whichever is available.
|
||||||
let sk_ports = self
|
let sk_ports = self
|
||||||
.env
|
.env
|
||||||
.safekeepers
|
.safekeepers
|
||||||
@@ -418,7 +418,12 @@ impl Endpoint {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(&self, auth_token: &Option<String>, safekeepers: Vec<NodeId>) -> Result<()> {
|
pub fn start(
|
||||||
|
&self,
|
||||||
|
auth_token: &Option<String>,
|
||||||
|
safekeepers: Vec<NodeId>,
|
||||||
|
remote_ext_config: Option<&String>,
|
||||||
|
) -> Result<()> {
|
||||||
if self.status() == "running" {
|
if self.status() == "running" {
|
||||||
anyhow::bail!("The endpoint is already running");
|
anyhow::bail!("The endpoint is already running");
|
||||||
}
|
}
|
||||||
@@ -486,6 +491,13 @@ impl Endpoint {
|
|||||||
pageserver_connstring: Some(pageserver_connstring),
|
pageserver_connstring: Some(pageserver_connstring),
|
||||||
safekeeper_connstrings,
|
safekeeper_connstrings,
|
||||||
storage_auth_token: auth_token.clone(),
|
storage_auth_token: auth_token.clone(),
|
||||||
|
// TODO FIXME: This is a hack to test custom extensions locally.
|
||||||
|
// In test_download_extensions, we assume that the custom extension
|
||||||
|
// prefix is the tenant ID. So we set it here.
|
||||||
|
//
|
||||||
|
// The proper way to implement this is to pass the custom extension
|
||||||
|
// in spec, but we don't have a way to do that yet in the python tests.
|
||||||
|
custom_extensions: Some(vec![self.tenant_id.to_string()]),
|
||||||
};
|
};
|
||||||
let spec_path = self.endpoint_path().join("spec.json");
|
let spec_path = self.endpoint_path().join("spec.json");
|
||||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||||
@@ -517,6 +529,11 @@ impl Endpoint {
|
|||||||
.stdin(std::process::Stdio::null())
|
.stdin(std::process::Stdio::null())
|
||||||
.stderr(logfile.try_clone()?)
|
.stderr(logfile.try_clone()?)
|
||||||
.stdout(logfile);
|
.stdout(logfile);
|
||||||
|
|
||||||
|
if let Some(remote_ext_config) = remote_ext_config {
|
||||||
|
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||||
|
}
|
||||||
|
|
||||||
let child = cmd.spawn()?;
|
let child = cmd.spawn()?;
|
||||||
|
|
||||||
// Write down the pid so we can wait for it when we want to stop
|
// Write down the pid so we can wait for it when we want to stop
|
||||||
|
|||||||
183
docs/rfcs/024-extension-loading.md
Normal file
183
docs/rfcs/024-extension-loading.md
Normal file
@@ -0,0 +1,183 @@
|
|||||||
|
# Supporting custom user Extensions (Dynamic Extension Loading)
|
||||||
|
Created 2023-05-03
|
||||||
|
|
||||||
|
## Motivation
|
||||||
|
|
||||||
|
There are many extensions in the PostgreSQL ecosystem, and not all extensions
|
||||||
|
are of a quality that we can confidently support them. Additionally, our
|
||||||
|
current extension inclusion mechanism has several problems because we build all
|
||||||
|
extensions into the primary Compute image: We build the extensions every time
|
||||||
|
we build the compute image regardless of whether we actually need to rebuild
|
||||||
|
the image, and the inclusion of these extensions in the image adds a hard
|
||||||
|
dependency on all supported extensions - thus increasing the image size, and
|
||||||
|
with it the time it takes to download that image - increasing first start
|
||||||
|
latency.
|
||||||
|
|
||||||
|
This RFC proposes a dynamic loading mechanism that solves most of these
|
||||||
|
problems.
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
`compute_ctl` is made responsible for loading extensions on-demand into
|
||||||
|
the container's file system for dynamically loaded extensions, and will also
|
||||||
|
make sure that the extensions in `shared_preload_libraries` are downloaded
|
||||||
|
before the compute node starts.
|
||||||
|
|
||||||
|
## Components
|
||||||
|
|
||||||
|
compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store
|
||||||
|
|
||||||
|
## Requirements
|
||||||
|
|
||||||
|
Compute nodes with no extra extensions should not be negatively impacted by
|
||||||
|
the existence of support for many extensions.
|
||||||
|
|
||||||
|
Installing an extension into PostgreSQL should be easy.
|
||||||
|
|
||||||
|
Non-preloaded extensions shouldn't impact startup latency.
|
||||||
|
|
||||||
|
Uninstalled extensions shouldn't impact query latency.
|
||||||
|
|
||||||
|
A small latency penalty for dynamically loaded extensions is acceptable in
|
||||||
|
the first seconds of compute startup, but not in steady-state operations.
|
||||||
|
|
||||||
|
## Proposed implementation
|
||||||
|
|
||||||
|
### On-demand, JIT-loading of extensions
|
||||||
|
|
||||||
|
Before postgres starts we download
|
||||||
|
- control files for all extensions available to that compute node;
|
||||||
|
- all `shared_preload_libraries`;
|
||||||
|
|
||||||
|
After postgres is running, `compute_ctl` listens for requests to load files.
|
||||||
|
When PostgreSQL requests a file, `compute_ctl` downloads it.
|
||||||
|
|
||||||
|
PostgreSQL requests files in the following cases:
|
||||||
|
- When loading a preload library set in `local_preload_libraries`
|
||||||
|
- When explicitly loading a library with `LOAD`
|
||||||
|
- Wnen creating extension with `CREATE EXTENSION` (download sql scripts, (optional) extension data files and (optional) library files)))
|
||||||
|
|
||||||
|
|
||||||
|
#### Summary
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Startup is only as slow as it takes to load all (shared_)preload_libraries
|
||||||
|
- Supports BYO Extension
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- O(sizeof(extensions)) IO requirement for loading all extensions.
|
||||||
|
|
||||||
|
### Alternative solutions
|
||||||
|
|
||||||
|
1. Allow users to add their extensions to the base image
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- Doesn't scale - first start size is dependent on image size;
|
||||||
|
- All extensions are shared across all users: It doesn't allow users to
|
||||||
|
bring their own restrictive-licensed extensions
|
||||||
|
|
||||||
|
2. Bring Your Own compute image
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Still easy to deploy
|
||||||
|
- User can bring own patched version of PostgreSQL
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- First start latency is O(sizeof(extensions image))
|
||||||
|
- Warm instance pool for skipping pod schedule latency is not feasible with
|
||||||
|
O(n) custom images
|
||||||
|
- Support channels are difficult to manage
|
||||||
|
|
||||||
|
3. Download all user extensions in bulk on compute start
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
- No startup latency issues for "clean" users.
|
||||||
|
- Warm instance pool for skipping pod schedule latency is possible
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- Downloading all extensions in advance takes a lot of time, thus startup
|
||||||
|
latency issues
|
||||||
|
|
||||||
|
4. Store user's extensions in persistent storage
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
- No startup latency issues
|
||||||
|
- Warm instance pool for skipping pod schedule latency is possible
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- EC2 instances have only limited number of attachments shared between EBS
|
||||||
|
volumes, direct-attached NVMe drives, and ENIs.
|
||||||
|
- Compute instance migration isn't trivially solved for EBS mounts (e.g.
|
||||||
|
the device is unavailable whilst moving the mount between instances).
|
||||||
|
- EBS can only mount on one instance at a time (except the expensive IO2
|
||||||
|
device type).
|
||||||
|
|
||||||
|
5. Store user's extensions in network drive
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
- Few startup latency issues
|
||||||
|
- Warm instance pool for skipping pod schedule latency is possible
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- We'd need networked drives, and a lot of them, which would store many
|
||||||
|
duplicate extensions.
|
||||||
|
- **UNCHECKED:** Compute instance migration may not work nicely with
|
||||||
|
networked IOs
|
||||||
|
|
||||||
|
|
||||||
|
### Idea extensions
|
||||||
|
|
||||||
|
The extension store does not have to be S3 directly, but could be a Node-local
|
||||||
|
caching service on top of S3. This would reduce the load on the network for
|
||||||
|
popular extensions.
|
||||||
|
|
||||||
|
## Extension Storage implementation
|
||||||
|
|
||||||
|
Extension Storage in our case is an S3 bucket with a "directory" per build and postgres version,
|
||||||
|
where extension files are stored as plain files in the bucket following the same directory structure as in the postgres.
|
||||||
|
|
||||||
|
i.e.
|
||||||
|
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/lib/postgis-3.1.so`
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/share/extension/postgis.control`
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/share/extension/postgis--3.1.sql`
|
||||||
|
|
||||||
|
To handle custom extensions, that available only to specific users, we use per-extension subdirectories:
|
||||||
|
|
||||||
|
i.e.
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix>/lib/ext-name.so`, etc.
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix>/share/extension/ext-name.control`, etc.
|
||||||
|
|
||||||
|
On compute start, `compute_ctl` accepts a list of custom_ext_prefixes.
|
||||||
|
|
||||||
|
To get the list of available extensions,`compute_ctl` downloads control files from all prefixes:
|
||||||
|
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/share/extension/`
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix1>/share/extension/`
|
||||||
|
`s3://<the-bucket>/<build-version>/<postgres-version>/<custom-ext-prefix2>/share/extension/`
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### How to add new extension to the Extension Storage?
|
||||||
|
|
||||||
|
Simply upload build artifacts to the S3 bucket.
|
||||||
|
Implement a CI step for that. Splitting it from ompute-node-image build.
|
||||||
|
|
||||||
|
### How do we deal with extension versions and updates?
|
||||||
|
|
||||||
|
Currently, we rebuild extensions on every compute-node-image build and store them in the <build-version> prefix.
|
||||||
|
This is needed to ensure that `/share` and `/lib` files are in sync.
|
||||||
|
|
||||||
|
For extension updates, we rely on the PostgreSQL extension versioning mechanism (sql update scripts) and extension authors to not break backwards compatibility within one major version of PostgreSQL.
|
||||||
|
|
||||||
|
### Alternatives
|
||||||
|
|
||||||
|
For extensions written on trusted languages we can also adopt
|
||||||
|
`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase.
|
||||||
|
This will increase the amount supported extensions and decrease the amount of work required to support them.
|
||||||
@@ -73,6 +73,7 @@ pub struct ComputeMetrics {
|
|||||||
pub basebackup_ms: u64,
|
pub basebackup_ms: u64,
|
||||||
pub config_ms: u64,
|
pub config_ms: u64,
|
||||||
pub total_startup_ms: u64,
|
pub total_startup_ms: u64,
|
||||||
|
pub load_libraries_ms: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||||
|
|||||||
@@ -60,6 +60,9 @@ pub struct ComputeSpec {
|
|||||||
/// If set, 'storage_auth_token' is used as the password to authenticate to
|
/// If set, 'storage_auth_token' is used as the password to authenticate to
|
||||||
/// the pageserver and safekeepers.
|
/// the pageserver and safekeepers.
|
||||||
pub storage_auth_token: Option<String>,
|
pub storage_auth_token: Option<String>,
|
||||||
|
|
||||||
|
// list of prefixes to search for custom extensions in remote extension storage
|
||||||
|
pub custom_extensions: Option<Vec<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
|
|||||||
@@ -184,6 +184,20 @@ pub enum GenericRemoteStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl GenericRemoteStorage {
|
impl GenericRemoteStorage {
|
||||||
|
// A function for listing all the files in a "directory"
|
||||||
|
// Example:
|
||||||
|
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
||||||
|
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
|
match self {
|
||||||
|
Self::LocalFs(s) => s.list_files(folder).await,
|
||||||
|
Self::AwsS3(s) => s.list_files(folder).await,
|
||||||
|
Self::Unreliable(s) => s.list_files(folder).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lists common *prefixes*, if any of files
|
||||||
|
// Example:
|
||||||
|
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
|
||||||
pub async fn list_prefixes(
|
pub async fn list_prefixes(
|
||||||
&self,
|
&self,
|
||||||
prefix: Option<&RemotePath>,
|
prefix: Option<&RemotePath>,
|
||||||
@@ -195,14 +209,6 @@ impl GenericRemoteStorage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
|
||||||
match self {
|
|
||||||
Self::LocalFs(s) => s.list_files(folder).await,
|
|
||||||
Self::AwsS3(s) => s.list_files(folder).await,
|
|
||||||
Self::Unreliable(s) => s.list_files(folder).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn upload(
|
pub async fn upload(
|
||||||
&self,
|
&self,
|
||||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||||
|
|||||||
@@ -349,10 +349,17 @@ impl RemoteStorage for S3Bucket {
|
|||||||
|
|
||||||
/// See the doc for `RemoteStorage::list_files`
|
/// See the doc for `RemoteStorage::list_files`
|
||||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
let folder_name = folder
|
let mut folder_name = folder
|
||||||
.map(|p| self.relative_path_to_s3_object(p))
|
.map(|p| self.relative_path_to_s3_object(p))
|
||||||
.or_else(|| self.prefix_in_bucket.clone());
|
.or_else(|| self.prefix_in_bucket.clone());
|
||||||
|
|
||||||
|
// remove leading "/" if one exists
|
||||||
|
if let Some(folder_name_slash) = folder_name.clone() {
|
||||||
|
if folder_name_slash.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||||
|
folder_name = Some(folder_name_slash[1..].to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// AWS may need to break the response into several parts
|
// AWS may need to break the response into several parts
|
||||||
let mut continuation_token = None;
|
let mut continuation_token = None;
|
||||||
let mut all_files = vec![];
|
let mut all_files = vec![];
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
MODULE_big = neon
|
MODULE_big = neon
|
||||||
OBJS = \
|
OBJS = \
|
||||||
$(WIN32RES) \
|
$(WIN32RES) \
|
||||||
|
extension_server.o \
|
||||||
file_cache.o \
|
file_cache.o \
|
||||||
libpagestore.o \
|
libpagestore.o \
|
||||||
libpqwalproposer.o \
|
libpqwalproposer.o \
|
||||||
|
|||||||
104
pgxn/neon/extension_server.c
Normal file
104
pgxn/neon/extension_server.c
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* extension_server.c
|
||||||
|
* Request compute_ctl to download extension files.
|
||||||
|
*
|
||||||
|
* IDENTIFICATION
|
||||||
|
* contrib/neon/extension_server.c
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "tcop/pquery.h"
|
||||||
|
#include "tcop/utility.h"
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
|
#include "commands/defrem.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "utils/acl.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
#include "utils/guc.h"
|
||||||
|
#include "port.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
|
||||||
|
#include <curl/curl.h>
|
||||||
|
|
||||||
|
static int extension_server_port = 0;
|
||||||
|
|
||||||
|
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||||
|
|
||||||
|
// to download all SQL (and data) files for an extension:
|
||||||
|
// curl -X POST http://localhost:8080/extension_server/postgis
|
||||||
|
// it covers two possible extension files layouts:
|
||||||
|
// 1. extension_name--version--platform.sql
|
||||||
|
// 2. extension_name/extension_name--version.sql
|
||||||
|
// extension_name/extra_files.csv
|
||||||
|
//
|
||||||
|
// to download specific library file:
|
||||||
|
// curl -X POST http://localhost:8080/extension_server/postgis-3.so?is_library=true
|
||||||
|
static bool
|
||||||
|
neon_download_extension_file_http(const char *filename, bool is_library)
|
||||||
|
{
|
||||||
|
CURL *curl;
|
||||||
|
CURLcode res;
|
||||||
|
char *compute_ctl_url;
|
||||||
|
char *postdata;
|
||||||
|
bool ret = false;
|
||||||
|
|
||||||
|
if ((curl = curl_easy_init()) == NULL)
|
||||||
|
{
|
||||||
|
elog(ERROR, "Failed to initialize curl handle");
|
||||||
|
}
|
||||||
|
|
||||||
|
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||||
|
extension_server_port, filename, is_library ? "?is_library=true" : "");
|
||||||
|
|
||||||
|
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
|
||||||
|
|
||||||
|
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
|
||||||
|
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
|
||||||
|
// NOTE: 15L may be insufficient time for large extensions like postgis
|
||||||
|
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 15L /* seconds */);
|
||||||
|
|
||||||
|
if (curl)
|
||||||
|
{
|
||||||
|
/* Perform the request, res will get the return code */
|
||||||
|
res = curl_easy_perform(curl);
|
||||||
|
/* Check for errors */
|
||||||
|
if (res == CURLE_OK)
|
||||||
|
{
|
||||||
|
ret = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Don't error here because postgres will try to find the file
|
||||||
|
// and will fail with some proper error message if it's not found.
|
||||||
|
elog(WARNING, "neon_download_extension_file_http failed: %s\n", curl_easy_strerror(res));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* always cleanup */
|
||||||
|
curl_easy_cleanup(curl);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void pg_init_extension_server()
|
||||||
|
{
|
||||||
|
// Port to connect to compute_ctl on localhost
|
||||||
|
// to request extension files.
|
||||||
|
DefineCustomIntVariable("neon.extension_server_port",
|
||||||
|
"connection string to the compute_ctl",
|
||||||
|
NULL,
|
||||||
|
&extension_server_port,
|
||||||
|
0, 0, INT_MAX,
|
||||||
|
PGC_POSTMASTER,
|
||||||
|
0, /* no flags required */
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
// set download_extension_file_hook
|
||||||
|
prev_download_extension_file_hook = download_extension_file_hook;
|
||||||
|
download_extension_file_hook = neon_download_extension_file_http;
|
||||||
|
}
|
||||||
1
pgxn/neon/extension_server.h
Normal file
1
pgxn/neon/extension_server.h
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
@@ -35,8 +35,11 @@ _PG_init(void)
|
|||||||
{
|
{
|
||||||
pg_init_libpagestore();
|
pg_init_libpagestore();
|
||||||
pg_init_walproposer();
|
pg_init_walproposer();
|
||||||
|
|
||||||
InitControlPlaneConnector();
|
InitControlPlaneConnector();
|
||||||
|
|
||||||
|
pg_init_extension_server();
|
||||||
|
|
||||||
// Important: This must happen after other parts of the extension
|
// Important: This must happen after other parts of the extension
|
||||||
// are loaded, otherwise any settings to GUCs that were set before
|
// are loaded, otherwise any settings to GUCs that were set before
|
||||||
// the extension was loaded will be removed.
|
// the extension was loaded will be removed.
|
||||||
|
|||||||
@@ -21,6 +21,8 @@ extern char *neon_tenant;
|
|||||||
extern void pg_init_libpagestore(void);
|
extern void pg_init_libpagestore(void);
|
||||||
extern void pg_init_walproposer(void);
|
extern void pg_init_walproposer(void);
|
||||||
|
|
||||||
|
extern void pg_init_extension_server(void);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Returns true if we shouldn't do REDO on that block in record indicated by
|
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||||
* block_id; false otherwise.
|
* block_id; false otherwise.
|
||||||
|
|||||||
@@ -529,6 +529,16 @@ def available_remote_storages() -> List[RemoteStorageKind]:
|
|||||||
return remote_storages
|
return remote_storages
|
||||||
|
|
||||||
|
|
||||||
|
def available_s3_storages() -> List[RemoteStorageKind]:
|
||||||
|
remote_storages = [RemoteStorageKind.MOCK_S3]
|
||||||
|
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
|
||||||
|
remote_storages.append(RemoteStorageKind.REAL_S3)
|
||||||
|
log.info("Enabling real s3 storage for tests")
|
||||||
|
else:
|
||||||
|
log.info("Using mock implementations to test remote storage")
|
||||||
|
return remote_storages
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class LocalFsStorage:
|
class LocalFsStorage:
|
||||||
root: Path
|
root: Path
|
||||||
@@ -549,6 +559,16 @@ class S3Storage:
|
|||||||
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def to_string(self) -> str:
|
||||||
|
return json.dumps(
|
||||||
|
{
|
||||||
|
"bucket": self.bucket_name,
|
||||||
|
"region": self.bucket_region,
|
||||||
|
"endpoint": self.endpoint,
|
||||||
|
"prefix": self.prefix_in_bucket,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
||||||
|
|
||||||
@@ -615,10 +635,12 @@ class NeonEnvBuilder:
|
|||||||
self.rust_log_override = rust_log_override
|
self.rust_log_override = rust_log_override
|
||||||
self.port_distributor = port_distributor
|
self.port_distributor = port_distributor
|
||||||
self.remote_storage = remote_storage
|
self.remote_storage = remote_storage
|
||||||
|
self.ext_remote_storage: Optional[S3Storage] = None
|
||||||
|
self.remote_storage_client: Optional[Any] = None
|
||||||
self.remote_storage_users = remote_storage_users
|
self.remote_storage_users = remote_storage_users
|
||||||
self.broker = broker
|
self.broker = broker
|
||||||
self.run_id = run_id
|
self.run_id = run_id
|
||||||
self.mock_s3_server = mock_s3_server
|
self.mock_s3_server: MockS3Server = mock_s3_server
|
||||||
self.pageserver_config_override = pageserver_config_override
|
self.pageserver_config_override = pageserver_config_override
|
||||||
self.num_safekeepers = num_safekeepers
|
self.num_safekeepers = num_safekeepers
|
||||||
self.safekeepers_id_start = safekeepers_id_start
|
self.safekeepers_id_start = safekeepers_id_start
|
||||||
@@ -666,15 +688,24 @@ class NeonEnvBuilder:
|
|||||||
remote_storage_kind: RemoteStorageKind,
|
remote_storage_kind: RemoteStorageKind,
|
||||||
test_name: str,
|
test_name: str,
|
||||||
force_enable: bool = True,
|
force_enable: bool = True,
|
||||||
|
enable_remote_extensions: bool = False,
|
||||||
):
|
):
|
||||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||||
return
|
return
|
||||||
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
self.enable_mock_s3_remote_storage(
|
||||||
|
bucket_name=test_name,
|
||||||
|
force_enable=force_enable,
|
||||||
|
enable_remote_extensions=enable_remote_extensions,
|
||||||
|
)
|
||||||
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
|
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||||
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
|
self.enable_real_s3_remote_storage(
|
||||||
|
test_name=test_name,
|
||||||
|
force_enable=force_enable,
|
||||||
|
enable_remote_extensions=enable_remote_extensions,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
|
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
|
||||||
|
|
||||||
@@ -688,11 +719,15 @@ class NeonEnvBuilder:
|
|||||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||||
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
|
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
|
||||||
|
|
||||||
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
|
def enable_mock_s3_remote_storage(
|
||||||
|
self, bucket_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
|
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
|
||||||
Starts up the mock server, if that does not run yet.
|
Starts up the mock server, if that does not run yet.
|
||||||
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
||||||
|
|
||||||
|
Also creates the bucket for extensions, self.ext_remote_storage bucket
|
||||||
"""
|
"""
|
||||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||||
mock_endpoint = self.mock_s3_server.endpoint()
|
mock_endpoint = self.mock_s3_server.endpoint()
|
||||||
@@ -713,9 +748,22 @@ class NeonEnvBuilder:
|
|||||||
bucket_region=mock_region,
|
bucket_region=mock_region,
|
||||||
access_key=self.mock_s3_server.access_key(),
|
access_key=self.mock_s3_server.access_key(),
|
||||||
secret_key=self.mock_s3_server.secret_key(),
|
secret_key=self.mock_s3_server.secret_key(),
|
||||||
|
prefix_in_bucket="pageserver",
|
||||||
)
|
)
|
||||||
|
|
||||||
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
|
if enable_remote_extensions:
|
||||||
|
self.ext_remote_storage = S3Storage(
|
||||||
|
bucket_name=bucket_name,
|
||||||
|
endpoint=mock_endpoint,
|
||||||
|
bucket_region=mock_region,
|
||||||
|
access_key=self.mock_s3_server.access_key(),
|
||||||
|
secret_key=self.mock_s3_server.secret_key(),
|
||||||
|
prefix_in_bucket="ext",
|
||||||
|
)
|
||||||
|
|
||||||
|
def enable_real_s3_remote_storage(
|
||||||
|
self, test_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Sets up configuration to use real s3 endpoint without mock server
|
Sets up configuration to use real s3 endpoint without mock server
|
||||||
"""
|
"""
|
||||||
@@ -752,9 +800,18 @@ class NeonEnvBuilder:
|
|||||||
bucket_region=region,
|
bucket_region=region,
|
||||||
access_key=access_key,
|
access_key=access_key,
|
||||||
secret_key=secret_key,
|
secret_key=secret_key,
|
||||||
prefix_in_bucket=self.remote_storage_prefix,
|
prefix_in_bucket=f"{self.remote_storage_prefix}/pageserver",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if enable_remote_extensions:
|
||||||
|
self.ext_remote_storage = S3Storage(
|
||||||
|
bucket_name=bucket_name,
|
||||||
|
bucket_region=region,
|
||||||
|
access_key=access_key,
|
||||||
|
secret_key=secret_key,
|
||||||
|
prefix_in_bucket=f"{self.remote_storage_prefix}/ext",
|
||||||
|
)
|
||||||
|
|
||||||
def cleanup_local_storage(self):
|
def cleanup_local_storage(self):
|
||||||
if self.preserve_database_files:
|
if self.preserve_database_files:
|
||||||
return
|
return
|
||||||
@@ -788,6 +845,7 @@ class NeonEnvBuilder:
|
|||||||
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
|
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
|
||||||
# so this line effectively a no-op
|
# so this line effectively a no-op
|
||||||
assert isinstance(self.remote_storage, S3Storage)
|
assert isinstance(self.remote_storage, S3Storage)
|
||||||
|
assert self.remote_storage_client is not None
|
||||||
|
|
||||||
if self.keep_remote_storage_contents:
|
if self.keep_remote_storage_contents:
|
||||||
log.info("keep_remote_storage_contents skipping remote storage cleanup")
|
log.info("keep_remote_storage_contents skipping remote storage cleanup")
|
||||||
@@ -917,6 +975,8 @@ class NeonEnv:
|
|||||||
self.neon_binpath = config.neon_binpath
|
self.neon_binpath = config.neon_binpath
|
||||||
self.pg_distrib_dir = config.pg_distrib_dir
|
self.pg_distrib_dir = config.pg_distrib_dir
|
||||||
self.endpoint_counter = 0
|
self.endpoint_counter = 0
|
||||||
|
self.remote_storage_client = config.remote_storage_client
|
||||||
|
self.ext_remote_storage = config.ext_remote_storage
|
||||||
|
|
||||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||||
# so that we don't need to dig it out of the config file afterwards.
|
# so that we don't need to dig it out of the config file afterwards.
|
||||||
@@ -1503,6 +1563,7 @@ class NeonCli(AbstractNeonCli):
|
|||||||
safekeepers: Optional[List[int]] = None,
|
safekeepers: Optional[List[int]] = None,
|
||||||
tenant_id: Optional[TenantId] = None,
|
tenant_id: Optional[TenantId] = None,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
|
remote_ext_config: Optional[str] = None,
|
||||||
) -> "subprocess.CompletedProcess[str]":
|
) -> "subprocess.CompletedProcess[str]":
|
||||||
args = [
|
args = [
|
||||||
"endpoint",
|
"endpoint",
|
||||||
@@ -1512,6 +1573,8 @@ class NeonCli(AbstractNeonCli):
|
|||||||
"--pg-version",
|
"--pg-version",
|
||||||
self.env.pg_version,
|
self.env.pg_version,
|
||||||
]
|
]
|
||||||
|
if remote_ext_config is not None:
|
||||||
|
args.extend(["--remote-ext-config", remote_ext_config])
|
||||||
if lsn is not None:
|
if lsn is not None:
|
||||||
args.append(f"--lsn={lsn}")
|
args.append(f"--lsn={lsn}")
|
||||||
args.extend(["--pg-port", str(pg_port)])
|
args.extend(["--pg-port", str(pg_port)])
|
||||||
@@ -2373,7 +2436,7 @@ class Endpoint(PgProtocol):
|
|||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def start(self) -> "Endpoint":
|
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
|
||||||
"""
|
"""
|
||||||
Start the Postgres instance.
|
Start the Postgres instance.
|
||||||
Returns self.
|
Returns self.
|
||||||
@@ -2389,6 +2452,7 @@ class Endpoint(PgProtocol):
|
|||||||
http_port=self.http_port,
|
http_port=self.http_port,
|
||||||
tenant_id=self.tenant_id,
|
tenant_id=self.tenant_id,
|
||||||
safekeepers=self.active_safekeepers,
|
safekeepers=self.active_safekeepers,
|
||||||
|
remote_ext_config=remote_ext_config,
|
||||||
)
|
)
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
@@ -2478,6 +2542,7 @@ class Endpoint(PgProtocol):
|
|||||||
hot_standby: bool = False,
|
hot_standby: bool = False,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
|
remote_ext_config: Optional[str] = None,
|
||||||
) -> "Endpoint":
|
) -> "Endpoint":
|
||||||
"""
|
"""
|
||||||
Create an endpoint, apply config, and start Postgres.
|
Create an endpoint, apply config, and start Postgres.
|
||||||
@@ -2492,7 +2557,7 @@ class Endpoint(PgProtocol):
|
|||||||
config_lines=config_lines,
|
config_lines=config_lines,
|
||||||
hot_standby=hot_standby,
|
hot_standby=hot_standby,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
).start()
|
).start(remote_ext_config=remote_ext_config)
|
||||||
|
|
||||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||||
|
|
||||||
@@ -2526,6 +2591,7 @@ class EndpointFactory:
|
|||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
hot_standby: bool = False,
|
hot_standby: bool = False,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
|
remote_ext_config: Optional[str] = None,
|
||||||
) -> Endpoint:
|
) -> Endpoint:
|
||||||
ep = Endpoint(
|
ep = Endpoint(
|
||||||
self.env,
|
self.env,
|
||||||
@@ -2542,6 +2608,7 @@ class EndpointFactory:
|
|||||||
hot_standby=hot_standby,
|
hot_standby=hot_standby,
|
||||||
config_lines=config_lines,
|
config_lines=config_lines,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
|
remote_ext_config=remote_ext_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
|
|||||||
@@ -89,6 +89,9 @@ class TenantId(Id):
|
|||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f'`TenantId("{self.id.hex()}")'
|
return f'`TenantId("{self.id.hex()}")'
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return self.id.hex()
|
||||||
|
|
||||||
|
|
||||||
class TimelineId(Id):
|
class TimelineId(Id):
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
|
|||||||
250
test_runner/regress/test_download_extensions.py
Normal file
250
test_runner/regress/test_download_extensions.py
Normal file
@@ -0,0 +1,250 @@
|
|||||||
|
import os
|
||||||
|
from contextlib import closing
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import (
|
||||||
|
NeonEnvBuilder,
|
||||||
|
PgBin,
|
||||||
|
RemoteStorageKind,
|
||||||
|
available_s3_storages,
|
||||||
|
)
|
||||||
|
from fixtures.pg_version import PgVersion
|
||||||
|
from fixtures.types import TenantId
|
||||||
|
|
||||||
|
NUM_EXT = 3
|
||||||
|
|
||||||
|
|
||||||
|
def control_file_content(owner, i):
|
||||||
|
output = f"""# mock {owner} extension{i}
|
||||||
|
comment = 'This is a mock extension'
|
||||||
|
default_version = '1.0'
|
||||||
|
module_pathname = '$libdir/test_ext{i}'
|
||||||
|
relocatable = true"""
|
||||||
|
return BytesIO(bytes(output, "utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def sql_file_content():
|
||||||
|
output = """
|
||||||
|
CREATE FUNCTION test_ext_add(integer, integer) RETURNS integer
|
||||||
|
AS 'select $1 + $2;'
|
||||||
|
LANGUAGE SQL
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
"""
|
||||||
|
return BytesIO(bytes(output, "utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def fake_library_content():
|
||||||
|
return BytesIO(bytes("\n111\n", "utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
# Prepare some mock extension files and upload them to the bucket
|
||||||
|
# returns a list of files that should be cleaned up after the test
|
||||||
|
def prepare_mock_ext_storage(
|
||||||
|
pg_version: PgVersion,
|
||||||
|
tenant_id: TenantId,
|
||||||
|
pg_bin: PgBin,
|
||||||
|
ext_remote_storage,
|
||||||
|
remote_storage_client,
|
||||||
|
):
|
||||||
|
bucket_prefix = ext_remote_storage.prefix_in_bucket
|
||||||
|
custom_prefix = str(tenant_id)
|
||||||
|
|
||||||
|
PUB_EXT_ROOT = f"v{pg_version}/share/extension"
|
||||||
|
PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/extension"
|
||||||
|
LOCAL_EXT_ROOT = f"pg_install/v{pg_version}/share/postgresql/extension"
|
||||||
|
|
||||||
|
PUB_LIB_ROOT = f"v{pg_version}/lib"
|
||||||
|
PRIVATE_LIB_ROOT = f"v{pg_version}/{custom_prefix}/lib"
|
||||||
|
LOCAL_LIB_ROOT = f"{pg_bin.pg_lib_dir}/postgresql"
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f"""
|
||||||
|
PUB_EXT_ROOT: {PUB_EXT_ROOT}
|
||||||
|
PRIVATE_EXT_ROOT: {PRIVATE_EXT_ROOT}
|
||||||
|
LOCAL_EXT_ROOT: {LOCAL_EXT_ROOT}
|
||||||
|
PUB_LIB_ROOT: {PUB_LIB_ROOT}
|
||||||
|
PRIVATE_LIB_ROOT: {PRIVATE_LIB_ROOT}
|
||||||
|
LOCAL_LIB_ROOT: {LOCAL_LIB_ROOT}
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
cleanup_files = []
|
||||||
|
|
||||||
|
# Upload several test_ext{i}.control files to the bucket
|
||||||
|
for i in range(NUM_EXT):
|
||||||
|
public_remote_name = f"{bucket_prefix}/{PUB_EXT_ROOT}/test_ext{i}.control"
|
||||||
|
custom_remote_name = f"{bucket_prefix}/{PRIVATE_EXT_ROOT}/custom_ext{i}.control"
|
||||||
|
cleanup_files += [
|
||||||
|
f"{LOCAL_EXT_ROOT}/test_ext{i}.control",
|
||||||
|
f"{LOCAL_EXT_ROOT}/custom_ext{i}.control",
|
||||||
|
]
|
||||||
|
|
||||||
|
log.info(f"Uploading control file to {public_remote_name}")
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
control_file_content("public", i), ext_remote_storage.bucket_name, public_remote_name
|
||||||
|
)
|
||||||
|
log.info(f"Uploading control file to {custom_remote_name}")
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
control_file_content(str(tenant_id), i),
|
||||||
|
ext_remote_storage.bucket_name,
|
||||||
|
custom_remote_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Upload SQL file for the extension we're going to create
|
||||||
|
sql_filename = "test_ext0--1.0.sql"
|
||||||
|
test_sql_public_remote_path = f"{bucket_prefix}/{PUB_EXT_ROOT}/{sql_filename}"
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
sql_file_content(),
|
||||||
|
ext_remote_storage.bucket_name,
|
||||||
|
test_sql_public_remote_path,
|
||||||
|
)
|
||||||
|
cleanup_files += [f"{LOCAL_EXT_ROOT}/{sql_filename}"]
|
||||||
|
|
||||||
|
# upload some fake library files
|
||||||
|
for i in range(2):
|
||||||
|
public_remote_name = f"{bucket_prefix}/{PUB_LIB_ROOT}/test_lib{i}.so"
|
||||||
|
custom_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/custom_lib{i}.so"
|
||||||
|
|
||||||
|
log.info(f"uploading fake library to {public_remote_name}")
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
fake_library_content(),
|
||||||
|
ext_remote_storage.bucket_name,
|
||||||
|
public_remote_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(f"uploading fake library to {custom_remote_name}")
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
fake_library_content(),
|
||||||
|
ext_remote_storage.bucket_name,
|
||||||
|
custom_remote_name,
|
||||||
|
)
|
||||||
|
cleanup_files += [f"{LOCAL_LIB_ROOT}/test_lib{i}.so", f"{LOCAL_LIB_ROOT}/custom_lib{i}.so"]
|
||||||
|
|
||||||
|
return cleanup_files
|
||||||
|
|
||||||
|
|
||||||
|
# Generate mock extension files and upload them to the bucket.
|
||||||
|
#
|
||||||
|
# Then check that compute nodes can download them and use them
|
||||||
|
# to CREATE EXTENSION and LOAD 'library.so'
|
||||||
|
#
|
||||||
|
# NOTE: You must have appropriate AWS credentials to run REAL_S3 test.
|
||||||
|
# It may also be necessary to set the following environment variables:
|
||||||
|
# export AWS_ACCESS_KEY_ID='test'
|
||||||
|
# export AWS_SECRET_ACCESS_KEY='test'
|
||||||
|
# export AWS_SECURITY_TOKEN='test'
|
||||||
|
# export AWS_SESSION_TOKEN='test'
|
||||||
|
# export AWS_DEFAULT_REGION='us-east-1'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||||
|
def test_remote_extensions(
|
||||||
|
neon_env_builder: NeonEnvBuilder,
|
||||||
|
remote_storage_kind: RemoteStorageKind,
|
||||||
|
pg_version: PgVersion,
|
||||||
|
pg_bin: PgBin,
|
||||||
|
):
|
||||||
|
neon_env_builder.enable_remote_storage(
|
||||||
|
remote_storage_kind=remote_storage_kind,
|
||||||
|
test_name="test_remote_extensions",
|
||||||
|
enable_remote_extensions=True,
|
||||||
|
)
|
||||||
|
neon_env_builder.num_safekeepers = 3
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
tenant_id, _ = env.neon_cli.create_tenant()
|
||||||
|
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||||
|
|
||||||
|
assert env.ext_remote_storage is not None
|
||||||
|
assert env.remote_storage_client is not None
|
||||||
|
|
||||||
|
# Prepare some mock extension files and upload them to the bucket
|
||||||
|
cleanup_files = prepare_mock_ext_storage(
|
||||||
|
pg_version,
|
||||||
|
tenant_id,
|
||||||
|
pg_bin,
|
||||||
|
env.ext_remote_storage,
|
||||||
|
env.remote_storage_client,
|
||||||
|
)
|
||||||
|
# Start a compute node and check that it can download the extensions
|
||||||
|
# and use them to CREATE EXTENSION and LOAD 'library.so'
|
||||||
|
#
|
||||||
|
# This block is wrapped in a try/finally so that the downloaded files
|
||||||
|
# are cleaned up even if the test fails
|
||||||
|
try:
|
||||||
|
endpoint = env.endpoints.create_start(
|
||||||
|
"test_remote_extensions",
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||||
|
# config_lines=["log_min_messages=debug3"],
|
||||||
|
)
|
||||||
|
with closing(endpoint.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
# Test query: check that test_ext0 was successfully downloaded
|
||||||
|
cur.execute("SELECT * FROM pg_available_extensions")
|
||||||
|
all_extensions = [x[0] for x in cur.fetchall()]
|
||||||
|
log.info(all_extensions)
|
||||||
|
for i in range(NUM_EXT):
|
||||||
|
assert f"test_ext{i}" in all_extensions
|
||||||
|
assert f"custom_ext{i}" in all_extensions
|
||||||
|
|
||||||
|
cur.execute("CREATE EXTENSION test_ext0")
|
||||||
|
cur.execute("SELECT extname FROM pg_extension")
|
||||||
|
all_extensions = [x[0] for x in cur.fetchall()]
|
||||||
|
log.info(all_extensions)
|
||||||
|
assert "test_ext0" in all_extensions
|
||||||
|
|
||||||
|
# Try to load existing library file
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'test_lib0.so'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail with
|
||||||
|
# could not load library ... test_ext.so: file too short
|
||||||
|
# because test_lib0.so is not real library file
|
||||||
|
log.info("LOAD test_lib0.so failed (expectedly): %s", e)
|
||||||
|
assert "file too short" in str(e)
|
||||||
|
|
||||||
|
# Try to load custom library file
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'custom_lib0.so'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail with
|
||||||
|
# could not load library ... test_ext.so: file too short
|
||||||
|
# because test_lib0.so is not real library file
|
||||||
|
log.info("LOAD custom_lib0.so failed (expectedly): %s", e)
|
||||||
|
assert "file too short" in str(e)
|
||||||
|
|
||||||
|
# Try to load existing library file without .so extension
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'test_lib1'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail with
|
||||||
|
# could not load library ... test_lib1.so: file too short
|
||||||
|
# because test_lib1.so is not real library file
|
||||||
|
log.info("LOAD test_lib1 failed (expectedly): %s", e)
|
||||||
|
assert "file too short" in str(e)
|
||||||
|
|
||||||
|
# Try to load non-existent library file
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'test_lib_fail.so'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail because test_lib_fail.so is not found
|
||||||
|
log.info("LOAD test_lib_fail.so failed (expectedly): %s", e)
|
||||||
|
assert (
|
||||||
|
"""could not access file "test_lib_fail.so": No such file or directory"""
|
||||||
|
in str(e)
|
||||||
|
)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# this is important because if the files aren't cleaned up then the test can
|
||||||
|
# pass even without successfully downloading the files if a previous run (or
|
||||||
|
# run with different type of remote storage) of the test did download the
|
||||||
|
# files
|
||||||
|
for file in cleanup_files:
|
||||||
|
try:
|
||||||
|
os.remove(file)
|
||||||
|
log.info(f"Deleted {file}")
|
||||||
|
except FileNotFoundError:
|
||||||
|
log.info(f"{file} does not exist, so cannot be deleted")
|
||||||
@@ -275,6 +275,7 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str]
|
|||||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||||
|
|
||||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||||
|
assert neon_env_builder.remote_storage_client is not None
|
||||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||||
@@ -628,7 +629,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# for some reason the check above doesnt immediately take effect for the below.
|
# for some reason the check above doesnt immediately take effect for the below.
|
||||||
# Assume it is mock server incosistency and check twice.
|
# Assume it is mock server inconsistency and check twice.
|
||||||
wait_until(
|
wait_until(
|
||||||
2,
|
2,
|
||||||
0.5,
|
0.5,
|
||||||
|
|||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 1144aee166...93a5ee7749
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 1984832c74...293a06e5e1
Reference in New Issue
Block a user