mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-04 19:20:36 +00:00
Compare commits
95 Commits
split-prox
...
extension_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38b0db734e | ||
|
|
316082d770 | ||
|
|
8b586ea748 | ||
|
|
b4abbfe6fb | ||
|
|
06763949c0 | ||
|
|
a6c9a4abe7 | ||
|
|
0875c2284c | ||
|
|
2089d02f94 | ||
|
|
a6f142c4f6 | ||
|
|
ce0da2889e | ||
|
|
ceebc8e870 | ||
|
|
1d407f937f | ||
|
|
b402dfd9c7 | ||
|
|
5e882d8359 | ||
|
|
b999b27655 | ||
|
|
ebe503e6f8 | ||
|
|
604d2643d4 | ||
|
|
7357b7cad5 | ||
|
|
a2e154f07b | ||
|
|
7667fdc5c8 | ||
|
|
85d02acdb4 | ||
|
|
5f986875bf | ||
|
|
1104de0b9b | ||
|
|
a8f848b5de | ||
|
|
ca59330df8 | ||
|
|
36bb5ad527 | ||
|
|
6532daf528 | ||
|
|
8bc128e474 | ||
|
|
c3994541eb | ||
|
|
7cdcc8a500 | ||
|
|
4201f4695f | ||
|
|
5776df15da | ||
|
|
31aa0283b0 | ||
|
|
fd3dfe9d52 | ||
|
|
384e3ab1a8 | ||
|
|
4259464f72 | ||
|
|
152206211b | ||
|
|
9c35c06c58 | ||
|
|
245b4c9d72 | ||
|
|
3d0f74fc0c | ||
|
|
ce55f70cac | ||
|
|
053d592ddb | ||
|
|
b85416b58d | ||
|
|
195838436c | ||
|
|
9313045de6 | ||
|
|
44ac7a45be | ||
|
|
e35e8a7dcb | ||
|
|
a79b0d69c4 | ||
|
|
d475e901e5 | ||
|
|
bf3b83b504 | ||
|
|
94781e8710 | ||
|
|
4b83a206bf | ||
|
|
f984f9e7d3 | ||
|
|
6b42464c23 | ||
|
|
605c30e5c5 | ||
|
|
0b11d8e836 | ||
|
|
7602483af9 | ||
|
|
5e1e859ab8 | ||
|
|
85a7511700 | ||
|
|
89b8ea132e | ||
|
|
bfbae98f24 | ||
|
|
02a1d4d8c1 | ||
|
|
4a35f29301 | ||
|
|
559e318328 | ||
|
|
a4d236b02f | ||
|
|
8b9f72e117 | ||
|
|
bb414e5a0a | ||
|
|
32c03bc784 | ||
|
|
c99e203094 | ||
|
|
e7b9259675 | ||
|
|
356f7d3a7e | ||
|
|
0f6b05337e | ||
|
|
2e81d280c8 | ||
|
|
f9700c8bb9 | ||
|
|
e6137d45d2 | ||
|
|
ab1d903600 | ||
|
|
bfd670b9a7 | ||
|
|
5e96ab43ea | ||
|
|
890061d371 | ||
|
|
6b74d1a76a | ||
|
|
a936b8a92b | ||
|
|
c7bea52849 | ||
|
|
1b7ab6d468 | ||
|
|
e07d5d00e9 | ||
|
|
15d3d007eb | ||
|
|
77157c7741 | ||
|
|
b9b1b3596c | ||
|
|
91a809332f | ||
|
|
214ecacfc4 | ||
|
|
7465c644b9 | ||
|
|
bb931f2ce0 | ||
|
|
8013f9630d | ||
|
|
34f22e9b12 | ||
|
|
4f3f817384 | ||
|
|
b50475b567 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -924,12 +924,14 @@ dependencies = [
|
|||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"postgres",
|
"postgres",
|
||||||
"regex",
|
"regex",
|
||||||
|
"remote_storage",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tar",
|
"tar",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
|
"toml_edit",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-opentelemetry",
|
"tracing-opentelemetry",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
@@ -997,6 +999,7 @@ dependencies = [
|
|||||||
"tar",
|
"tar",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"toml",
|
"toml",
|
||||||
|
"tracing",
|
||||||
"url",
|
"url",
|
||||||
"utils",
|
"utils",
|
||||||
"workspace_hack",
|
"workspace_hack",
|
||||||
|
|||||||
@@ -30,3 +30,5 @@ url.workspace = true
|
|||||||
compute_api.workspace = true
|
compute_api.workspace = true
|
||||||
utils.workspace = true
|
utils.workspace = true
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
toml_edit.workspace = true
|
||||||
|
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||||
|
|||||||
@@ -27,7 +27,8 @@
|
|||||||
//! compute_ctl -D /var/db/postgres/compute \
|
//! compute_ctl -D /var/db/postgres/compute \
|
||||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||||
//! -S /var/db/postgres/specs/current.json \
|
//! -S /var/db/postgres/specs/current.json \
|
||||||
//! -b /usr/local/bin/postgres
|
//! -b /usr/local/bin/postgres \
|
||||||
|
//! -r {"bucket": "my-bucket", "region": "eu-central-1"}
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -35,7 +36,7 @@ use std::fs::File;
|
|||||||
use std::panic;
|
use std::panic;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::{mpsc, Arc, Condvar, Mutex};
|
use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock};
|
||||||
use std::{thread, time::Duration};
|
use std::{thread, time::Duration};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
@@ -48,6 +49,7 @@ use compute_api::responses::ComputeStatus;
|
|||||||
|
|
||||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||||
use compute_tools::configurator::launch_configurator;
|
use compute_tools::configurator::launch_configurator;
|
||||||
|
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
|
||||||
use compute_tools::http::api::launch_http_server;
|
use compute_tools::http::api::launch_http_server;
|
||||||
use compute_tools::logger::*;
|
use compute_tools::logger::*;
|
||||||
use compute_tools::monitor::launch_monitor;
|
use compute_tools::monitor::launch_monitor;
|
||||||
@@ -64,6 +66,20 @@ fn main() -> Result<()> {
|
|||||||
info!("build_tag: {build_tag}");
|
info!("build_tag: {build_tag}");
|
||||||
|
|
||||||
let matches = cli().get_matches();
|
let matches = cli().get_matches();
|
||||||
|
let pgbin_default = String::from("postgres");
|
||||||
|
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||||
|
|
||||||
|
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||||
|
let ext_remote_storage = match remote_ext_config {
|
||||||
|
Some(x) => match init_remote_storage(x) {
|
||||||
|
Ok(y) => Some(y),
|
||||||
|
Err(e) => {
|
||||||
|
dbg!("Error {:?}, setting remote storage to None", e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
let http_port = *matches
|
let http_port = *matches
|
||||||
.get_one::<u16>("http-port")
|
.get_one::<u16>("http-port")
|
||||||
@@ -128,9 +144,6 @@ fn main() -> Result<()> {
|
|||||||
let compute_id = matches.get_one::<String>("compute-id");
|
let compute_id = matches.get_one::<String>("compute-id");
|
||||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||||
|
|
||||||
// Try to use just 'postgres' if no path is provided
|
|
||||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
|
||||||
|
|
||||||
let spec;
|
let spec;
|
||||||
let mut live_config_allowed = false;
|
let mut live_config_allowed = false;
|
||||||
match spec_json {
|
match spec_json {
|
||||||
@@ -168,6 +181,7 @@ fn main() -> Result<()> {
|
|||||||
|
|
||||||
let mut new_state = ComputeState::new();
|
let mut new_state = ComputeState::new();
|
||||||
let spec_set;
|
let spec_set;
|
||||||
|
|
||||||
if let Some(spec) = spec {
|
if let Some(spec) = spec {
|
||||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||||
new_state.pspec = Some(pspec);
|
new_state.pspec = Some(pspec);
|
||||||
@@ -179,9 +193,13 @@ fn main() -> Result<()> {
|
|||||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||||
pgdata: pgdata.to_string(),
|
pgdata: pgdata.to_string(),
|
||||||
pgbin: pgbin.to_string(),
|
pgbin: pgbin.to_string(),
|
||||||
|
pgversion: get_pg_version(pgbin),
|
||||||
live_config_allowed,
|
live_config_allowed,
|
||||||
state: Mutex::new(new_state),
|
state: Mutex::new(new_state),
|
||||||
state_changed: Condvar::new(),
|
state_changed: Condvar::new(),
|
||||||
|
ext_remote_storage,
|
||||||
|
available_libraries: OnceLock::new(),
|
||||||
|
available_extensions: OnceLock::new(),
|
||||||
};
|
};
|
||||||
let compute = Arc::new(compute_node);
|
let compute = Arc::new(compute_node);
|
||||||
|
|
||||||
@@ -190,6 +208,8 @@ fn main() -> Result<()> {
|
|||||||
let _http_handle =
|
let _http_handle =
|
||||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||||
|
|
||||||
|
let extension_server_port: u16 = http_port;
|
||||||
|
|
||||||
if !spec_set {
|
if !spec_set {
|
||||||
// No spec provided, hang waiting for it.
|
// No spec provided, hang waiting for it.
|
||||||
info!("no compute spec provided, waiting");
|
info!("no compute spec provided, waiting");
|
||||||
@@ -230,7 +250,7 @@ fn main() -> Result<()> {
|
|||||||
// Start Postgres
|
// Start Postgres
|
||||||
let mut delay_exit = false;
|
let mut delay_exit = false;
|
||||||
let mut exit_code = None;
|
let mut exit_code = None;
|
||||||
let pg = match compute.start_compute() {
|
let pg = match compute.start_compute(extension_server_port) {
|
||||||
Ok(pg) => Some(pg),
|
Ok(pg) => Some(pg),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("could not start the compute node: {:?}", err);
|
error!("could not start the compute node: {:?}", err);
|
||||||
@@ -349,6 +369,12 @@ fn cli() -> clap::Command {
|
|||||||
.long("control-plane-uri")
|
.long("control-plane-uri")
|
||||||
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("remote-ext-config")
|
||||||
|
.short('r')
|
||||||
|
.long("remote-ext-config")
|
||||||
|
.value_name("REMOTE_EXT_CONFIG"),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -1,13 +1,15 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process::{Command, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::{Condvar, Mutex};
|
use std::sync::{Condvar, Mutex, OnceLock};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use postgres::{Client, NoTls};
|
use postgres::{Client, NoTls};
|
||||||
|
use tokio;
|
||||||
use tokio_postgres;
|
use tokio_postgres;
|
||||||
use tracing::{info, instrument, warn};
|
use tracing::{info, instrument, warn};
|
||||||
use utils::id::{TenantId, TimelineId};
|
use utils::id::{TenantId, TimelineId};
|
||||||
@@ -16,9 +18,11 @@ use utils::lsn::Lsn;
|
|||||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||||
|
|
||||||
use crate::config;
|
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||||
|
|
||||||
use crate::pg_helpers::*;
|
use crate::pg_helpers::*;
|
||||||
use crate::spec::*;
|
use crate::spec::*;
|
||||||
|
use crate::{config, extension_server};
|
||||||
|
|
||||||
/// Compute node info shared across several `compute_ctl` threads.
|
/// Compute node info shared across several `compute_ctl` threads.
|
||||||
pub struct ComputeNode {
|
pub struct ComputeNode {
|
||||||
@@ -26,6 +30,7 @@ pub struct ComputeNode {
|
|||||||
pub connstr: url::Url,
|
pub connstr: url::Url,
|
||||||
pub pgdata: String,
|
pub pgdata: String,
|
||||||
pub pgbin: String,
|
pub pgbin: String,
|
||||||
|
pub pgversion: String,
|
||||||
/// We should only allow live re- / configuration of the compute node if
|
/// We should only allow live re- / configuration of the compute node if
|
||||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||||
/// the latest configuration. Otherwise, there could be a case:
|
/// the latest configuration. Otherwise, there could be a case:
|
||||||
@@ -45,6 +50,10 @@ pub struct ComputeNode {
|
|||||||
pub state: Mutex<ComputeState>,
|
pub state: Mutex<ComputeState>,
|
||||||
/// `Condvar` to allow notifying waiters about state changes.
|
/// `Condvar` to allow notifying waiters about state changes.
|
||||||
pub state_changed: Condvar,
|
pub state_changed: Condvar,
|
||||||
|
/// S3 extensions configuration variables
|
||||||
|
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||||
|
pub available_libraries: OnceLock<HashMap<String, RemotePath>>,
|
||||||
|
pub available_extensions: OnceLock<HashMap<String, Vec<RemotePath>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@@ -323,14 +332,22 @@ impl ComputeNode {
|
|||||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||||
/// safekeepers sync, basebackup, etc.
|
/// safekeepers sync, basebackup, etc.
|
||||||
#[instrument(skip(self, compute_state))]
|
#[instrument(skip(self, compute_state))]
|
||||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
pub fn prepare_pgdata(
|
||||||
|
&self,
|
||||||
|
compute_state: &ComputeState,
|
||||||
|
extension_server_port: u16,
|
||||||
|
) -> Result<()> {
|
||||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
let spec = &pspec.spec;
|
let spec = &pspec.spec;
|
||||||
let pgdata_path = Path::new(&self.pgdata);
|
let pgdata_path = Path::new(&self.pgdata);
|
||||||
|
|
||||||
// Remove/create an empty pgdata directory and put configuration there.
|
// Remove/create an empty pgdata directory and put configuration there.
|
||||||
self.create_pgdata()?;
|
self.create_pgdata()?;
|
||||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
config::write_postgres_conf(
|
||||||
|
&pgdata_path.join("postgresql.conf"),
|
||||||
|
&pspec.spec,
|
||||||
|
Some(extension_server_port),
|
||||||
|
)?;
|
||||||
|
|
||||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||||
// is already connected it will be kicked out, so a secondary (standby)
|
// is already connected it will be kicked out, so a secondary (standby)
|
||||||
@@ -472,7 +489,7 @@ impl ComputeNode {
|
|||||||
|
|
||||||
// Write new config
|
// Write new config
|
||||||
let pgdata_path = Path::new(&self.pgdata);
|
let pgdata_path = Path::new(&self.pgdata);
|
||||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
|
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
|
||||||
|
|
||||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||||
self.pg_reload_conf(&mut client)?;
|
self.pg_reload_conf(&mut client)?;
|
||||||
@@ -502,7 +519,7 @@ impl ComputeNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
|
||||||
let compute_state = self.state.lock().unwrap().clone();
|
let compute_state = self.state.lock().unwrap().clone();
|
||||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
info!(
|
info!(
|
||||||
@@ -513,7 +530,9 @@ impl ComputeNode {
|
|||||||
pspec.timeline_id,
|
pspec.timeline_id,
|
||||||
);
|
);
|
||||||
|
|
||||||
self.prepare_pgdata(&compute_state)?;
|
self.prepare_external_extensions(&compute_state)?;
|
||||||
|
|
||||||
|
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||||
|
|
||||||
let start_time = Utc::now();
|
let start_time = Utc::now();
|
||||||
|
|
||||||
@@ -649,4 +668,131 @@ LIMIT 100",
|
|||||||
"{{\"pg_stat_statements\": []}}".to_string()
|
"{{\"pg_stat_statements\": []}}".to_string()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If remote extension storage is configured,
|
||||||
|
// download extension control files
|
||||||
|
// and shared preload libraries.
|
||||||
|
#[tokio::main]
|
||||||
|
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
|
||||||
|
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
|
||||||
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
|
// download preload shared libraries before postgres start (if any)
|
||||||
|
let spec = &pspec.spec;
|
||||||
|
|
||||||
|
// 1. parse private extension paths from spec
|
||||||
|
let private_ext_prefixes = match &spec.private_extensions {
|
||||||
|
Some(private_extensions) => private_extensions.clone(),
|
||||||
|
None => Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("private_ext_prefixes: {:?}", &private_ext_prefixes);
|
||||||
|
|
||||||
|
// 2. parse shared_preload_libraries from spec
|
||||||
|
let mut libs_vec = Vec::new();
|
||||||
|
|
||||||
|
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||||
|
libs_vec = libs
|
||||||
|
.split(&[',', '\'', ' '])
|
||||||
|
.filter(|s| *s != "neon" && !s.is_empty())
|
||||||
|
.map(str::to_string)
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"shared_preload_libraries parsed from spec.cluster.settings: {:?}",
|
||||||
|
libs_vec
|
||||||
|
);
|
||||||
|
|
||||||
|
// also parse shared_preload_libraries from provided postgresql.conf
|
||||||
|
// that is used in neon_local and python tests
|
||||||
|
if let Some(conf) = &spec.cluster.postgresql_conf {
|
||||||
|
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
|
||||||
|
|
||||||
|
let mut shared_preload_libraries_line = "";
|
||||||
|
for line in conf_lines {
|
||||||
|
if line.starts_with("shared_preload_libraries") {
|
||||||
|
shared_preload_libraries_line = line;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut preload_libs_vec = Vec::new();
|
||||||
|
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
|
||||||
|
preload_libs_vec = libs
|
||||||
|
.split(&[',', '\'', ' '])
|
||||||
|
.filter(|s| *s != "neon" && !s.is_empty())
|
||||||
|
.map(str::to_string)
|
||||||
|
.collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"shared_preload_libraries parsed from spec.cluster.postgresql_conf: {:?}",
|
||||||
|
preload_libs_vec
|
||||||
|
);
|
||||||
|
|
||||||
|
libs_vec.extend(preload_libs_vec);
|
||||||
|
}
|
||||||
|
|
||||||
|
// download extension control files & shared_preload_libraries
|
||||||
|
|
||||||
|
let available_extensions = extension_server::get_available_extensions(
|
||||||
|
ext_remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
&self.pgversion,
|
||||||
|
&private_ext_prefixes,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
self.available_extensions
|
||||||
|
.set(available_extensions)
|
||||||
|
.expect("available_extensions.set error");
|
||||||
|
|
||||||
|
info!("Libraries to download: {:?}", &libs_vec);
|
||||||
|
let available_libraries = extension_server::get_available_libraries(
|
||||||
|
ext_remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
&self.pgversion,
|
||||||
|
&private_ext_prefixes,
|
||||||
|
&libs_vec,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
self.available_libraries
|
||||||
|
.set(available_libraries)
|
||||||
|
.expect("available_libraries.set error");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> {
|
||||||
|
match &self.ext_remote_storage {
|
||||||
|
None => anyhow::bail!("No remote extension storage"),
|
||||||
|
Some(remote_storage) => {
|
||||||
|
extension_server::download_extension_sql_files(
|
||||||
|
&filename,
|
||||||
|
remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
self.available_extensions
|
||||||
|
.get()
|
||||||
|
.context("available_extensions broke")?,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn download_library_file(&self, filename: String) -> Result<()> {
|
||||||
|
match &self.ext_remote_storage {
|
||||||
|
None => anyhow::bail!("No remote extension storage"),
|
||||||
|
Some(remote_storage) => {
|
||||||
|
extension_server::download_library_file(
|
||||||
|
&filename,
|
||||||
|
remote_storage,
|
||||||
|
&self.pgbin,
|
||||||
|
self.available_libraries
|
||||||
|
.get()
|
||||||
|
.context("available_libraries broke")?,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create or completely rewrite configuration file specified by `path`
|
/// Create or completely rewrite configuration file specified by `path`
|
||||||
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
pub fn write_postgres_conf(
|
||||||
|
path: &Path,
|
||||||
|
spec: &ComputeSpec,
|
||||||
|
extension_server_port: Option<u16>,
|
||||||
|
) -> Result<()> {
|
||||||
// File::create() destroys the file content if it exists.
|
// File::create() destroys the file content if it exists.
|
||||||
let mut file = File::create(path)?;
|
let mut file = File::create(path)?;
|
||||||
|
|
||||||
@@ -95,5 +99,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
|||||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(port) = extension_server_port {
|
||||||
|
writeln!(file, "neon.extension_server_port={}", port)?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
379
compute_tools/src/extension_server.rs
Normal file
379
compute_tools/src/extension_server.rs
Normal file
@@ -0,0 +1,379 @@
|
|||||||
|
// Download extension files from the extension store
|
||||||
|
// and put them in the right place in the postgres directory
|
||||||
|
use anyhow::{self, bail, Context, Result};
|
||||||
|
use remote_storage::*;
|
||||||
|
use serde_json::{self, Value};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::{BufWriter, Write};
|
||||||
|
use std::num::{NonZeroU32, NonZeroUsize};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::str;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
const SHARE_EXT_PATH: &str = "share/postgresql/extension";
|
||||||
|
|
||||||
|
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||||
|
// gives the result of `pg_config [argument]`
|
||||||
|
// where argument is a flag like `--version` or `--sharedir`
|
||||||
|
let pgconfig = pgbin.replace("postgres", "pg_config");
|
||||||
|
let config_output = std::process::Command::new(pgconfig)
|
||||||
|
.arg(argument)
|
||||||
|
.output()
|
||||||
|
.expect("pg_config error");
|
||||||
|
std::str::from_utf8(&config_output.stdout)
|
||||||
|
.expect("pg_config error")
|
||||||
|
.trim()
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_pg_version(pgbin: &str) -> String {
|
||||||
|
// pg_config --version returns a (platform specific) human readable string
|
||||||
|
// such as "PostgreSQL 15.4". We parse this to v14/v15
|
||||||
|
let human_version = get_pg_config("--version", pgbin);
|
||||||
|
if human_version.contains("15") {
|
||||||
|
return "v15".to_string();
|
||||||
|
} else if human_version.contains("14") {
|
||||||
|
return "v14".to_string();
|
||||||
|
}
|
||||||
|
panic!("Unsuported postgres version {human_version}");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn download_helper(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
remote_from_path: &RemotePath,
|
||||||
|
remote_from_prefix: Option<&Path>,
|
||||||
|
download_location: &Path,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// downloads file at remote_from_path to download_location/[file_name]
|
||||||
|
|
||||||
|
// we cannot use remote_from_path.object_name() here
|
||||||
|
// because extension files can be in subdirectories of the extension store.
|
||||||
|
//
|
||||||
|
// To handle this, we use remote_from_prefix to strip the prefix from the path
|
||||||
|
// this gives us the relative path of the file in the extension store,
|
||||||
|
// and we use this relative path to construct the local path.
|
||||||
|
//
|
||||||
|
let local_path = match remote_from_prefix {
|
||||||
|
Some(prefix) => {
|
||||||
|
let p = remote_from_path
|
||||||
|
.get_path()
|
||||||
|
.strip_prefix(prefix)
|
||||||
|
.expect("bad prefix");
|
||||||
|
|
||||||
|
download_location.join(p)
|
||||||
|
}
|
||||||
|
None => download_location.join(remote_from_path.object_name().expect("bad object")),
|
||||||
|
};
|
||||||
|
|
||||||
|
if local_path.exists() {
|
||||||
|
info!("File {:?} already exists. Skipping download", &local_path);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Downloading {:?} to location {:?}",
|
||||||
|
&remote_from_path, &local_path
|
||||||
|
);
|
||||||
|
let mut download = remote_storage.download(remote_from_path).await?;
|
||||||
|
let mut write_data_buffer = Vec::new();
|
||||||
|
download
|
||||||
|
.download_stream
|
||||||
|
.read_to_end(&mut write_data_buffer)
|
||||||
|
.await?;
|
||||||
|
if remote_from_prefix.is_some() {
|
||||||
|
if let Some(prefix) = local_path.parent() {
|
||||||
|
info!(
|
||||||
|
"Downloading file with prefix. Create directory {:?}",
|
||||||
|
prefix
|
||||||
|
);
|
||||||
|
// if directory already exists, this is a no-op
|
||||||
|
std::fs::create_dir_all(prefix)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut output_file = BufWriter::new(File::create(local_path)?);
|
||||||
|
output_file.write_all(&write_data_buffer)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// download extension control files
|
||||||
|
//
|
||||||
|
// if private_ext_prefixes is provided - search also in private extension paths
|
||||||
|
//
|
||||||
|
pub async fn get_available_extensions(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
pg_version: &str,
|
||||||
|
private_ext_prefixes: &Vec<String>,
|
||||||
|
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
|
||||||
|
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||||
|
|
||||||
|
let mut paths: Vec<RemotePath> = Vec::new();
|
||||||
|
// public extensions
|
||||||
|
paths.push(RemotePath::new(
|
||||||
|
&Path::new(pg_version).join(SHARE_EXT_PATH),
|
||||||
|
)?);
|
||||||
|
// private extensions
|
||||||
|
for private_prefix in private_ext_prefixes {
|
||||||
|
paths.push(RemotePath::new(
|
||||||
|
&Path::new(pg_version)
|
||||||
|
.join(private_prefix)
|
||||||
|
.join(SHARE_EXT_PATH),
|
||||||
|
)?);
|
||||||
|
}
|
||||||
|
|
||||||
|
let all_available_files = list_files_in_prefixes_for_extensions(remote_storage, &paths).await?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"list of available_extension files {:?}",
|
||||||
|
&all_available_files
|
||||||
|
);
|
||||||
|
|
||||||
|
// download all control files
|
||||||
|
for (obj_name, obj_paths) in &all_available_files {
|
||||||
|
for obj_path in obj_paths {
|
||||||
|
if obj_name.ends_with("control") {
|
||||||
|
download_helper(remote_storage, obj_path, None, &local_sharedir).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(all_available_files)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download requested shared_preload_libraries
|
||||||
|
//
|
||||||
|
// Note that tenant_id is not optional here, because we only download libraries
|
||||||
|
// after we know the tenant spec and the tenant_id.
|
||||||
|
//
|
||||||
|
// return list of all library files to use it in the future searches
|
||||||
|
pub async fn get_available_libraries(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
pg_version: &str,
|
||||||
|
private_ext_prefixes: &Vec<String>,
|
||||||
|
preload_libraries: &Vec<String>,
|
||||||
|
) -> anyhow::Result<HashMap<String, RemotePath>> {
|
||||||
|
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
|
||||||
|
// Construct a hashmap of all available libraries
|
||||||
|
// example (key, value) pair: test_lib0.so, v14/lib/test_lib0.so
|
||||||
|
|
||||||
|
let mut paths: Vec<RemotePath> = Vec::new();
|
||||||
|
// public libraries
|
||||||
|
paths.push(
|
||||||
|
RemotePath::new(&Path::new(&pg_version).join("lib/"))
|
||||||
|
.expect("The hard coded path here is valid"),
|
||||||
|
);
|
||||||
|
// private libraries
|
||||||
|
for private_prefix in private_ext_prefixes {
|
||||||
|
paths.push(
|
||||||
|
RemotePath::new(&Path::new(&pg_version).join(private_prefix).join("lib"))
|
||||||
|
.expect("The hard coded path here is valid"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let all_available_libraries = list_files_in_prefixes(remote_storage, &paths).await?;
|
||||||
|
|
||||||
|
info!("list of library files {:?}", &all_available_libraries);
|
||||||
|
|
||||||
|
// download all requested libraries
|
||||||
|
for lib_name in preload_libraries {
|
||||||
|
// add file extension if it isn't in the filename
|
||||||
|
let lib_name_with_ext = enforce_so_end(lib_name);
|
||||||
|
info!("looking for library {:?}", &lib_name_with_ext);
|
||||||
|
|
||||||
|
match all_available_libraries.get(&*lib_name_with_ext) {
|
||||||
|
Some(remote_path) => {
|
||||||
|
download_helper(remote_storage, remote_path, None, &local_libdir).await?
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let file_path = local_libdir.join(&lib_name_with_ext);
|
||||||
|
if file_path.exists() {
|
||||||
|
info!("File {:?} already exists. Skipping download", &file_path);
|
||||||
|
} else {
|
||||||
|
bail!("Shared library file {lib_name} is not found in the extension store")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(all_available_libraries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// download all sqlfiles (and possibly data files) for a given extension name
|
||||||
|
//
|
||||||
|
pub async fn download_extension_sql_files(
|
||||||
|
ext_name: &str,
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
all_available_files: &HashMap<String, Vec<RemotePath>>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||||
|
let mut downloaded_something = false;
|
||||||
|
|
||||||
|
if let Some(files) = all_available_files.get(ext_name) {
|
||||||
|
for file in files {
|
||||||
|
if file.extension().context("bad file name")? != "control" {
|
||||||
|
// find files prefix to handle cases when extension files are stored
|
||||||
|
// in a directory with the same name as the extension
|
||||||
|
// example:
|
||||||
|
// share/postgresql/extension/extension_name/extension_name--1.0.sql
|
||||||
|
let index = file
|
||||||
|
.get_path()
|
||||||
|
.to_str()
|
||||||
|
.context("invalid path")?
|
||||||
|
.find(ext_name)
|
||||||
|
.context("invalid path")?;
|
||||||
|
|
||||||
|
let prefix_str =
|
||||||
|
file.get_path().to_str().context("invalid path")?[..index].to_string();
|
||||||
|
let remote_from_prefix = if prefix_str.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Path::new(&prefix_str))
|
||||||
|
};
|
||||||
|
|
||||||
|
download_helper(remote_storage, file, remote_from_prefix, &local_sharedir).await?;
|
||||||
|
downloaded_something = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !downloaded_something {
|
||||||
|
bail!("Files for extension {ext_name} are not found in the extension store");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// appends an .so suffix to libname if it does not already have one
|
||||||
|
fn enforce_so_end(libname: &str) -> String {
|
||||||
|
if !libname.ends_with(".so") {
|
||||||
|
format!("{}.so", libname)
|
||||||
|
} else {
|
||||||
|
libname.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// download shared library file
|
||||||
|
pub async fn download_library_file(
|
||||||
|
lib_name: &str,
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
pgbin: &str,
|
||||||
|
all_available_libraries: &HashMap<String, RemotePath>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
|
||||||
|
let lib_name_with_ext = enforce_so_end(lib_name);
|
||||||
|
info!("looking for library {:?}", &lib_name_with_ext);
|
||||||
|
match all_available_libraries.get(&*lib_name_with_ext) {
|
||||||
|
Some(remote_path) => {
|
||||||
|
download_helper(remote_storage, remote_path, None, &local_libdir).await?
|
||||||
|
}
|
||||||
|
None => bail!("Shared library file {lib_name} is not found in the extension store"),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
|
||||||
|
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
|
||||||
|
let remote_ext_bucket = match &remote_ext_config["bucket"] {
|
||||||
|
Value::String(x) => x,
|
||||||
|
_ => bail!("remote_ext_config missing bucket"),
|
||||||
|
};
|
||||||
|
let remote_ext_region = match &remote_ext_config["region"] {
|
||||||
|
Value::String(x) => x,
|
||||||
|
_ => bail!("remote_ext_config missing region"),
|
||||||
|
};
|
||||||
|
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
|
||||||
|
Value::String(x) => Some(x.clone()),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let remote_ext_prefix = match &remote_ext_config["prefix"] {
|
||||||
|
Value::String(x) => Some(x.clone()),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// load will not be large, so default parameters are fine
|
||||||
|
let config = S3Config {
|
||||||
|
bucket_name: remote_ext_bucket.to_string(),
|
||||||
|
bucket_region: remote_ext_region.to_string(),
|
||||||
|
prefix_in_bucket: remote_ext_prefix,
|
||||||
|
endpoint: remote_ext_endpoint,
|
||||||
|
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||||
|
max_keys_per_list_response: None,
|
||||||
|
};
|
||||||
|
let config = RemoteStorageConfig {
|
||||||
|
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
|
||||||
|
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
|
||||||
|
storage: RemoteStorageKind::AwsS3(config),
|
||||||
|
};
|
||||||
|
GenericRemoteStorage::from_config(&config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper to collect all files in the given prefixes
|
||||||
|
// returns hashmap of (file_name, file_remote_path)
|
||||||
|
async fn list_files_in_prefixes(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
paths: &Vec<RemotePath>,
|
||||||
|
) -> Result<HashMap<String, RemotePath>> {
|
||||||
|
let mut res = HashMap::new();
|
||||||
|
|
||||||
|
for path in paths {
|
||||||
|
for file in remote_storage.list_files(Some(path)).await? {
|
||||||
|
res.insert(
|
||||||
|
file.object_name().expect("bad object").to_owned(),
|
||||||
|
file.to_owned(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper to extract extension name
|
||||||
|
// extension files can be in subdirectories of the extension store.
|
||||||
|
// examples of layout:
|
||||||
|
//
|
||||||
|
// share/postgresql/extension/extension_name--1.0.sql
|
||||||
|
//
|
||||||
|
// or
|
||||||
|
//
|
||||||
|
// share/postgresql/extension/extension_name/extension_name--1.0.sql
|
||||||
|
// share/postgresql/extension/extension_name/extra_data.csv
|
||||||
|
//
|
||||||
|
// Note: we **assume** that the extension files is in one of these formats.
|
||||||
|
// If it is not, this code will not download it.
|
||||||
|
fn get_ext_name(path: &str) -> Result<&str> {
|
||||||
|
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
|
||||||
|
|
||||||
|
let path_suffix = path_suffix.last().expect("bad ext name");
|
||||||
|
// the order of these is important
|
||||||
|
// otherwise we'll return incorrect extension name
|
||||||
|
// for path like share/postgresql/extension/extension_name/extension_name--1.0.sql
|
||||||
|
for index in ["/", "--"] {
|
||||||
|
if let Some(index) = path_suffix.find(index) {
|
||||||
|
return Ok(&path_suffix[..index]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(path_suffix)
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper to collect files of given prefixes for extensions
|
||||||
|
// and group them by extension
|
||||||
|
// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension)
|
||||||
|
async fn list_files_in_prefixes_for_extensions(
|
||||||
|
remote_storage: &GenericRemoteStorage,
|
||||||
|
paths: &Vec<RemotePath>,
|
||||||
|
) -> Result<HashMap<String, Vec<RemotePath>>> {
|
||||||
|
let mut result = HashMap::new();
|
||||||
|
for path in paths {
|
||||||
|
for file in remote_storage.list_files(Some(path)).await? {
|
||||||
|
let file_ext_name = get_ext_name(file.get_path().to_str().context("invalid path")?)?;
|
||||||
|
let ext_file_list = result
|
||||||
|
.entry(file_ext_name.to_string())
|
||||||
|
.or_insert(Vec::new());
|
||||||
|
ext_file_list.push(file.to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
@@ -121,8 +121,62 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// download extension files from S3 on demand
|
||||||
|
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||||
|
info!("serving {:?} POST request", route);
|
||||||
|
info!("req.uri {:?}", req.uri());
|
||||||
|
|
||||||
|
let mut is_library = false;
|
||||||
|
|
||||||
|
if let Some(params) = req.uri().query() {
|
||||||
|
info!("serving {:?} POST request with params: {}", route, params);
|
||||||
|
|
||||||
|
if params == "is_library=true" {
|
||||||
|
is_library = true;
|
||||||
|
} else {
|
||||||
|
let mut resp = Response::new(Body::from("Wrong request parameters"));
|
||||||
|
*resp.status_mut() = StatusCode::BAD_REQUEST;
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let filename = route.split('/').last().unwrap().to_string();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"serving /extension_server POST request, filename: {:?} is_library: {}",
|
||||||
|
filename, is_library
|
||||||
|
);
|
||||||
|
|
||||||
|
if is_library {
|
||||||
|
match compute.download_library_file(filename.to_string()).await {
|
||||||
|
Ok(_) => Response::new(Body::from("OK")),
|
||||||
|
Err(e) => {
|
||||||
|
error!("library download failed: {}", e);
|
||||||
|
let mut resp = Response::new(Body::from(e.to_string()));
|
||||||
|
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||||
|
resp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match compute
|
||||||
|
.download_extension_sql_files(filename.to_string())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => Response::new(Body::from("OK")),
|
||||||
|
Err(e) => {
|
||||||
|
error!("extension download failed: {}", e);
|
||||||
|
let mut resp = Response::new(Body::from(e.to_string()));
|
||||||
|
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||||
|
resp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return the `404 Not Found` for any other routes.
|
// Return the `404 Not Found` for any other routes.
|
||||||
_ => {
|
method => {
|
||||||
|
info!("404 Not Found for {:?}", method);
|
||||||
|
|
||||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
||||||
not_found
|
not_found
|
||||||
|
|||||||
@@ -139,6 +139,34 @@ paths:
|
|||||||
application/json:
|
application/json:
|
||||||
schema:
|
schema:
|
||||||
$ref: "#/components/schemas/GenericError"
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
/extension_server:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- Extension
|
||||||
|
summary: Download extension from S3 to local folder.
|
||||||
|
description: ""
|
||||||
|
operationId: downloadExtension
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: Extension downloaded
|
||||||
|
content:
|
||||||
|
text/plain:
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
description: Error text or 'OK' if download succeeded.
|
||||||
|
example: "OK"
|
||||||
|
400:
|
||||||
|
description: Request is invalid.
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
500:
|
||||||
|
description: Extension download request failed.
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
|
||||||
components:
|
components:
|
||||||
securitySchemes:
|
securitySchemes:
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ pub mod http;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
pub mod logger;
|
pub mod logger;
|
||||||
pub mod compute;
|
pub mod compute;
|
||||||
|
pub mod extension_server;
|
||||||
pub mod monitor;
|
pub mod monitor;
|
||||||
pub mod params;
|
pub mod params;
|
||||||
pub mod pg_helpers;
|
pub mod pg_helpers;
|
||||||
|
|||||||
@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
|
|||||||
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
||||||
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
||||||
// always write all config into it creating new file.
|
// always write all config into it creating new file.
|
||||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
|
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
|
||||||
|
|
||||||
update_pg_hba(pgdata_path)?;
|
update_pg_hba(pgdata_path)?;
|
||||||
|
|
||||||
|
|||||||
@@ -32,3 +32,4 @@ utils.workspace = true
|
|||||||
|
|
||||||
compute_api.workspace = true
|
compute_api.workspace = true
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
|||||||
@@ -657,6 +657,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.get_one::<String>("endpoint_id")
|
.get_one::<String>("endpoint_id")
|
||||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||||
|
|
||||||
|
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
|
||||||
|
|
||||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||||
let safekeepers =
|
let safekeepers =
|
||||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||||
@@ -698,7 +700,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
println!("Starting existing endpoint {endpoint_id}...");
|
println!("Starting existing endpoint {endpoint_id}...");
|
||||||
endpoint.start(&auth_token, safekeepers)?;
|
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||||
} else {
|
} else {
|
||||||
let branch_name = sub_args
|
let branch_name = sub_args
|
||||||
.get_one::<String>("branch-name")
|
.get_one::<String>("branch-name")
|
||||||
@@ -742,7 +744,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
pg_version,
|
pg_version,
|
||||||
mode,
|
mode,
|
||||||
)?;
|
)?;
|
||||||
ep.start(&auth_token, safekeepers)?;
|
ep.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"stop" => {
|
"stop" => {
|
||||||
@@ -1002,6 +1004,12 @@ fn cli() -> Command {
|
|||||||
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
||||||
.required(false);
|
.required(false);
|
||||||
|
|
||||||
|
let remote_ext_config_args = Arg::new("remote-ext-config")
|
||||||
|
.long("remote-ext-config")
|
||||||
|
.num_args(1)
|
||||||
|
.help("Configure the S3 bucket that we search for extensions in.")
|
||||||
|
.required(false);
|
||||||
|
|
||||||
let lsn_arg = Arg::new("lsn")
|
let lsn_arg = Arg::new("lsn")
|
||||||
.long("lsn")
|
.long("lsn")
|
||||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||||
@@ -1152,6 +1160,7 @@ fn cli() -> Command {
|
|||||||
.arg(pg_version_arg)
|
.arg(pg_version_arg)
|
||||||
.arg(hot_standby_arg)
|
.arg(hot_standby_arg)
|
||||||
.arg(safekeepers_arg)
|
.arg(safekeepers_arg)
|
||||||
|
.arg(remote_ext_config_args)
|
||||||
)
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
Command::new("stop")
|
Command::new("stop")
|
||||||
|
|||||||
@@ -311,7 +311,7 @@ impl Endpoint {
|
|||||||
|
|
||||||
// TODO: use future host field from safekeeper spec
|
// TODO: use future host field from safekeeper spec
|
||||||
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
||||||
// whichever is availiable.
|
// whichever is available.
|
||||||
let sk_ports = self
|
let sk_ports = self
|
||||||
.env
|
.env
|
||||||
.safekeepers
|
.safekeepers
|
||||||
@@ -408,7 +408,12 @@ impl Endpoint {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(&self, auth_token: &Option<String>, safekeepers: Vec<NodeId>) -> Result<()> {
|
pub fn start(
|
||||||
|
&self,
|
||||||
|
auth_token: &Option<String>,
|
||||||
|
safekeepers: Vec<NodeId>,
|
||||||
|
remote_ext_config: Option<&String>,
|
||||||
|
) -> Result<()> {
|
||||||
if self.status() == "running" {
|
if self.status() == "running" {
|
||||||
anyhow::bail!("The endpoint is already running");
|
anyhow::bail!("The endpoint is already running");
|
||||||
}
|
}
|
||||||
@@ -476,6 +481,7 @@ impl Endpoint {
|
|||||||
pageserver_connstring: Some(pageserver_connstring),
|
pageserver_connstring: Some(pageserver_connstring),
|
||||||
safekeeper_connstrings,
|
safekeeper_connstrings,
|
||||||
storage_auth_token: auth_token.clone(),
|
storage_auth_token: auth_token.clone(),
|
||||||
|
private_extensions: Some(vec![self.tenant_id.to_string()]), //DEBUG ONLY
|
||||||
};
|
};
|
||||||
let spec_path = self.endpoint_path().join("spec.json");
|
let spec_path = self.endpoint_path().join("spec.json");
|
||||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||||
@@ -507,6 +513,9 @@ impl Endpoint {
|
|||||||
.stdin(std::process::Stdio::null())
|
.stdin(std::process::Stdio::null())
|
||||||
.stderr(logfile.try_clone()?)
|
.stderr(logfile.try_clone()?)
|
||||||
.stdout(logfile);
|
.stdout(logfile);
|
||||||
|
if let Some(remote_ext_config) = remote_ext_config {
|
||||||
|
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||||
|
}
|
||||||
let _child = cmd.spawn()?;
|
let _child = cmd.spawn()?;
|
||||||
|
|
||||||
// Wait for it to start
|
// Wait for it to start
|
||||||
|
|||||||
301
docs/rfcs/024-extension-loading.md
Normal file
301
docs/rfcs/024-extension-loading.md
Normal file
@@ -0,0 +1,301 @@
|
|||||||
|
# Supporting custom user Extensions
|
||||||
|
|
||||||
|
Created 2023-05-03
|
||||||
|
|
||||||
|
## Motivation
|
||||||
|
|
||||||
|
There are many extensions in the PostgreSQL ecosystem, and not all extensions
|
||||||
|
are of a quality that we can confidently support them. Additionally, our
|
||||||
|
current extension inclusion mechanism has several problems because we build all
|
||||||
|
extensions into the primary Compute image: We build the extensions every time
|
||||||
|
we build the compute image regardless of whether we actually need to rebuild
|
||||||
|
the image, and the inclusion of these extensions in the image adds a hard
|
||||||
|
dependency on all supported extensions - thus increasing the image size, and
|
||||||
|
with it the time it takes to download that image - increasing first start
|
||||||
|
latency.
|
||||||
|
|
||||||
|
This RFC proposes a dynamic loading mechanism that solves most of these
|
||||||
|
problems.
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
`compute_ctl` is made responsible for loading extensions on-demand into
|
||||||
|
the container's file system for dynamically loaded extensions, and will also
|
||||||
|
make sure that the extensions in `shared_preload_libraries` are downloaded
|
||||||
|
before the compute node starts.
|
||||||
|
|
||||||
|
## Components
|
||||||
|
|
||||||
|
compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store
|
||||||
|
|
||||||
|
## Requirements
|
||||||
|
|
||||||
|
Compute nodes with no extra extensions should not be negatively impacted by
|
||||||
|
the existence of support for many extensions.
|
||||||
|
|
||||||
|
Installing an extension into PostgreSQL should be easy.
|
||||||
|
|
||||||
|
Non-preloaded extensions shouldn't impact startup latency.
|
||||||
|
|
||||||
|
Uninstalled extensions shouldn't impact query latency.
|
||||||
|
|
||||||
|
A small latency penalty for dynamically loaded extensions is acceptable in
|
||||||
|
the first seconds of compute startup, but not in steady-state operations.
|
||||||
|
|
||||||
|
## Proposed implementation
|
||||||
|
|
||||||
|
### On-demand, JIT-loading of extensions
|
||||||
|
|
||||||
|
TLDR; we download extensions as soon as we need them, or when we have spare
|
||||||
|
time.
|
||||||
|
|
||||||
|
That means, we first download the extensions required to start the PostMaster
|
||||||
|
(`shared_preload_libraries` and their dependencies), then the libraries required
|
||||||
|
before a backend can start processing user input (`preload_libraries` and
|
||||||
|
dependencies), and then (with network limits applied) the remainder of the
|
||||||
|
configured extensions, with prioritization for installed extensions.
|
||||||
|
|
||||||
|
If PostgreSQL tries to load a library that is not yet fully on disk, it will
|
||||||
|
ask `compute_ctl` first if the extension has been downloaded yet, and will wait
|
||||||
|
for `compute_ctl` to finish downloading that extension. `compute_ctl` will
|
||||||
|
prioritize downloading that extension over other extensions that were not yet
|
||||||
|
requested.
|
||||||
|
|
||||||
|
#### Workflow
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
autonumber
|
||||||
|
participant EX as External (control plane, ...)
|
||||||
|
participant CTL as compute_ctl
|
||||||
|
participant ST as extension store
|
||||||
|
actor PG as PostgreSQL
|
||||||
|
|
||||||
|
EX ->>+ CTL: Start compute with config X
|
||||||
|
|
||||||
|
note over CTL: The configuration contains a list of all <br/>extensions available to that compute node, etc.
|
||||||
|
|
||||||
|
par Optionally parallel or concurrent
|
||||||
|
loop Available extensions
|
||||||
|
CTL ->>+ ST: Download control file of extension
|
||||||
|
activate CTL
|
||||||
|
ST ->>- CTL: Finish downloading control file
|
||||||
|
CTL ->>- CTL: Put control file in extensions directory
|
||||||
|
end
|
||||||
|
|
||||||
|
loop For each extension in shared_preload_libraries
|
||||||
|
CTL ->>+ ST: Download extension's data
|
||||||
|
activate CTL
|
||||||
|
ST ->>- CTL: Finish downloading
|
||||||
|
CTL ->>- CTL: Put extension's files in the right place
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
CTL ->>+ PG: Start PostgreSQL
|
||||||
|
|
||||||
|
note over CTL: PostgreSQL can now start accepting <br/>connections. However, users may still need to wait <br/>for preload_libraries extensions to get downloaded.
|
||||||
|
|
||||||
|
par Load preload_libraries
|
||||||
|
loop For each extension in preload_libraries
|
||||||
|
CTL ->>+ ST: Download extension's data
|
||||||
|
activate CTL
|
||||||
|
ST ->>- CTL: Finish downloading
|
||||||
|
CTL ->>- CTL: Put extension's files in the right place
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
note over CTL: After this, connections don't have any hard <br/>waits for extension files left, except for those <br/>connections that override preload_libraries <br/>in their startup packet
|
||||||
|
|
||||||
|
par PG's internal_load_library(library)
|
||||||
|
alt Library is not yet loaded
|
||||||
|
PG ->>+ CTL: Load library X
|
||||||
|
CTL ->>+ ST: Download the extension that provides X
|
||||||
|
ST ->>- CTL: Finish downloading
|
||||||
|
CTL ->> CTL: Put extension's files in the right place
|
||||||
|
CTL ->>- PG: Ready
|
||||||
|
else Library is already loaded
|
||||||
|
note over PG: No-op
|
||||||
|
end
|
||||||
|
and Download all remaining extensions
|
||||||
|
loop Extension X
|
||||||
|
CTL ->>+ ST: Download not-yet-downloaded extension X
|
||||||
|
activate CTL
|
||||||
|
ST ->>- CTL: Finish downloading
|
||||||
|
CTL ->>- CTL: Put extension's files in the right place
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
deactivate PG
|
||||||
|
deactivate CTL
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Summary
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Startup is only as slow as it takes to load all (shared_)preload_libraries
|
||||||
|
- Supports BYO Extension
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- O(sizeof(extensions)) IO requirement for loading all extensions.
|
||||||
|
|
||||||
|
### Alternative solutions
|
||||||
|
|
||||||
|
1. Allow users to add their extensions to the base image
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- Doesn't scale - first start size is dependent on image size;
|
||||||
|
- All extensions are shared across all users: It doesn't allow users to
|
||||||
|
bring their own restrictive-licensed extensions
|
||||||
|
|
||||||
|
2. Bring Your Own compute image
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Still easy to deploy
|
||||||
|
- User can bring own patched version of PostgreSQL
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- First start latency is O(sizeof(extensions image))
|
||||||
|
- Warm instance pool for skipping pod schedule latency is not feasible with
|
||||||
|
O(n) custom images
|
||||||
|
- Support channels are difficult to manage
|
||||||
|
|
||||||
|
3. Download all user extensions in bulk on compute start
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
- No startup latency issues for "clean" users.
|
||||||
|
- Warm instance pool for skipping pod schedule latency is possible
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- Downloading all extensions in advance takes a lot of time, thus startup
|
||||||
|
latency issues
|
||||||
|
|
||||||
|
4. Store user's extensions in persistent storage
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
- No startup latency issues
|
||||||
|
- Warm instance pool for skipping pod schedule latency is possible
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- EC2 instances have only limited number of attachments shared between EBS
|
||||||
|
volumes, direct-attached NVMe drives, and ENIs.
|
||||||
|
- Compute instance migration isn't trivially solved for EBS mounts (e.g.
|
||||||
|
the device is unavailable whilst moving the mount between instances).
|
||||||
|
- EBS can only mount on one instance at a time (except the expensive IO2
|
||||||
|
device type).
|
||||||
|
|
||||||
|
5. Store user's extensions in network drive
|
||||||
|
|
||||||
|
Pros:
|
||||||
|
- Easy to deploy
|
||||||
|
- Few startup latency issues
|
||||||
|
- Warm instance pool for skipping pod schedule latency is possible
|
||||||
|
|
||||||
|
Cons:
|
||||||
|
- We'd need networked drives, and a lot of them, which would store many
|
||||||
|
duplicate extensions.
|
||||||
|
- **UNCHECKED:** Compute instance migration may not work nicely with
|
||||||
|
networked IOs
|
||||||
|
|
||||||
|
|
||||||
|
### Idea extensions
|
||||||
|
|
||||||
|
The extension store does not have to be S3 directly, but could be a Node-local
|
||||||
|
caching service on top of S3. This would reduce the load on the network for
|
||||||
|
popular extensions.
|
||||||
|
|
||||||
|
## Extension Store implementation
|
||||||
|
|
||||||
|
Extension Store in our case is a private S3 bucket.
|
||||||
|
Extensions are stored as tarballs in the bucket. The tarball contains the extension's control file and all the files that the extension needs to run.
|
||||||
|
|
||||||
|
We may also store the control file separately from the tarball to speed up the extension loading.
|
||||||
|
|
||||||
|
`s3://<the-bucket>/extensions/ext-name/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar`
|
||||||
|
|
||||||
|
where `ext-name` is an extension name and `sha-256+1234abcd1234abcd1234abcd1234abcd` is a hash of a specific extension version tarball.
|
||||||
|
|
||||||
|
To ensure security, there is no direct access to the S3 bucket from compute node.
|
||||||
|
|
||||||
|
Control plane forms a list of extensions available to the compute node
|
||||||
|
and forms a short-lived [pre-signed URL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html)
|
||||||
|
for each extension that is available to the compute node.
|
||||||
|
|
||||||
|
so, `compute_ctl` receives spec in the following format
|
||||||
|
|
||||||
|
```
|
||||||
|
"extensions": [{
|
||||||
|
"meta_format": 1,
|
||||||
|
"extension_name": "postgis",
|
||||||
|
"link": "https://<the-bucket>/extensions/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar?AWSAccessKeyId=1234abcd1234abcd1234abcd1234abcd&Expires=1234567890&Signature=1234abcd1234abcd1234abcd1234abcd",
|
||||||
|
...
|
||||||
|
}]
|
||||||
|
```
|
||||||
|
|
||||||
|
`compute_ctl` then downloads the extension from the link and unpacks it to the right place.
|
||||||
|
|
||||||
|
### How do we handle private extensions?
|
||||||
|
|
||||||
|
Private and public extensions are treated equally from the Extension Store perspective.
|
||||||
|
The only difference is that the private extensions are not listed in the user UI (managed by control plane).
|
||||||
|
|
||||||
|
### How to add new extension to the Extension Store?
|
||||||
|
|
||||||
|
Since we need to verify that the extension is compatible with the compute node and doesn't contain any malicious code,
|
||||||
|
we need to review the extension before adding it to the Extension Store.
|
||||||
|
|
||||||
|
I do not expect that we will have a lot of extensions to review, so we can do it manually for now.
|
||||||
|
|
||||||
|
Some admin UI may be added later to automate this process.
|
||||||
|
|
||||||
|
The list of extensions available to a compute node is stored in the console database.
|
||||||
|
|
||||||
|
### How is the list of available extensions managed?
|
||||||
|
|
||||||
|
We need to add new tables to the console database to store the list of available extensions, their versions and access rights.
|
||||||
|
|
||||||
|
something like this:
|
||||||
|
|
||||||
|
```
|
||||||
|
CREATE TABLE extensions (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
name VARCHAR(255) NOT NULL,
|
||||||
|
version VARCHAR(255) NOT NULL,
|
||||||
|
hash VARCHAR(255) NOT NULL, // this is the path to the extension in the Extension Store
|
||||||
|
supported_postgres_versions integer[] NOT NULL,
|
||||||
|
is_public BOOLEAN NOT NULL, // public extensions are available to all users
|
||||||
|
is_shared_preload BOOLEAN NOT NULL, // these extensions require postgres restart
|
||||||
|
is_preload BOOLEAN NOT NULL,
|
||||||
|
license VARCHAR(255) NOT NULL,
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE user_extensions (
|
||||||
|
user_id INTEGER NOT NULL,
|
||||||
|
extension_id INTEGER NOT NULL,
|
||||||
|
FOREIGN KEY (user_id) REFERENCES users (id),
|
||||||
|
FOREIGN KEY (extension_id) REFERENCES extensions (id)
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
|
When new extension is added to the Extension Store, we add a new record to the table and set permissions.
|
||||||
|
|
||||||
|
In UI, user may select the extensions that they want to use with their compute node.
|
||||||
|
|
||||||
|
NOTE: Extensions that require postgres restart will not be available until the next compute restart.
|
||||||
|
Also, currently user cannot force postgres restart. We should add this feature later.
|
||||||
|
|
||||||
|
For other extensions, we must communicate updates to `compute_ctl` and they will be downloaded in the background.
|
||||||
|
|
||||||
|
### How can user update the extension?
|
||||||
|
|
||||||
|
User can update the extension by selecting the new version of the extension in the UI.
|
||||||
|
|
||||||
|
### Alternatives
|
||||||
|
|
||||||
|
For extensions written on trusted languages we can also adopt
|
||||||
|
`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase.
|
||||||
|
This will increase the amount supported extensions and decrease the amount of work required to support them.
|
||||||
@@ -60,6 +60,8 @@ pub struct ComputeSpec {
|
|||||||
/// If set, 'storage_auth_token' is used as the password to authenticate to
|
/// If set, 'storage_auth_token' is used as the password to authenticate to
|
||||||
/// the pageserver and safekeepers.
|
/// the pageserver and safekeepers.
|
||||||
pub storage_auth_token: Option<String>,
|
pub storage_auth_token: Option<String>,
|
||||||
|
|
||||||
|
pub private_extensions: Option<Vec<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
|
|||||||
@@ -184,6 +184,20 @@ pub enum GenericRemoteStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl GenericRemoteStorage {
|
impl GenericRemoteStorage {
|
||||||
|
// A function for listing all the files in a "directory"
|
||||||
|
// Example:
|
||||||
|
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
||||||
|
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
|
match self {
|
||||||
|
Self::LocalFs(s) => s.list_files(folder).await,
|
||||||
|
Self::AwsS3(s) => s.list_files(folder).await,
|
||||||
|
Self::Unreliable(s) => s.list_files(folder).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lists common *prefixes*, if any of files
|
||||||
|
// Example:
|
||||||
|
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
|
||||||
pub async fn list_prefixes(
|
pub async fn list_prefixes(
|
||||||
&self,
|
&self,
|
||||||
prefix: Option<&RemotePath>,
|
prefix: Option<&RemotePath>,
|
||||||
@@ -195,14 +209,6 @@ impl GenericRemoteStorage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
|
||||||
match self {
|
|
||||||
Self::LocalFs(s) => s.list_files(folder).await,
|
|
||||||
Self::AwsS3(s) => s.list_files(folder).await,
|
|
||||||
Self::Unreliable(s) => s.list_files(folder).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn upload(
|
pub async fn upload(
|
||||||
&self,
|
&self,
|
||||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||||
|
|||||||
@@ -349,10 +349,17 @@ impl RemoteStorage for S3Bucket {
|
|||||||
|
|
||||||
/// See the doc for `RemoteStorage::list_files`
|
/// See the doc for `RemoteStorage::list_files`
|
||||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
let folder_name = folder
|
let mut folder_name = folder
|
||||||
.map(|p| self.relative_path_to_s3_object(p))
|
.map(|p| self.relative_path_to_s3_object(p))
|
||||||
.or_else(|| self.prefix_in_bucket.clone());
|
.or_else(|| self.prefix_in_bucket.clone());
|
||||||
|
|
||||||
|
// remove leading "/" if one exists
|
||||||
|
if let Some(folder_name_slash) = folder_name.clone() {
|
||||||
|
if folder_name_slash.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||||
|
folder_name = Some(folder_name_slash[1..].to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// AWS may need to break the response into several parts
|
// AWS may need to break the response into several parts
|
||||||
let mut continuation_token = None;
|
let mut continuation_token = None;
|
||||||
let mut all_files = vec![];
|
let mut all_files = vec![];
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
MODULE_big = neon
|
MODULE_big = neon
|
||||||
OBJS = \
|
OBJS = \
|
||||||
$(WIN32RES) \
|
$(WIN32RES) \
|
||||||
|
extension_server.o \
|
||||||
file_cache.o \
|
file_cache.o \
|
||||||
libpagestore.o \
|
libpagestore.o \
|
||||||
libpqwalproposer.o \
|
libpqwalproposer.o \
|
||||||
|
|||||||
103
pgxn/neon/extension_server.c
Normal file
103
pgxn/neon/extension_server.c
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* extension_server.c
|
||||||
|
* Request compute_ctl to download extension files.
|
||||||
|
*
|
||||||
|
* IDENTIFICATION
|
||||||
|
* contrib/neon/extension_server.c
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "tcop/pquery.h"
|
||||||
|
#include "tcop/utility.h"
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
|
#include "commands/defrem.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "utils/acl.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
#include "utils/guc.h"
|
||||||
|
#include "port.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
|
||||||
|
#include <curl/curl.h>
|
||||||
|
|
||||||
|
static int extension_server_port = 0;
|
||||||
|
|
||||||
|
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||||
|
|
||||||
|
// to download all SQL files for an extension:
|
||||||
|
// curl -X POST http://localhost:8080/extension_server/postgis
|
||||||
|
//
|
||||||
|
// to download specific library file:
|
||||||
|
// curl -X POST http://localhost:8080/extension_server/postgis-3.so?=true
|
||||||
|
static bool
|
||||||
|
neon_download_extension_file_http(const char *filename, bool is_library)
|
||||||
|
{
|
||||||
|
CURL *curl;
|
||||||
|
CURLcode res;
|
||||||
|
char *compute_ctl_url;
|
||||||
|
char *postdata;
|
||||||
|
bool ret = false;
|
||||||
|
|
||||||
|
if ((curl = curl_easy_init()) == NULL)
|
||||||
|
{
|
||||||
|
elog(ERROR, "Failed to initialize curl handle");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (is_library)
|
||||||
|
{
|
||||||
|
elog(LOG, "request library");
|
||||||
|
}
|
||||||
|
|
||||||
|
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||||
|
extension_server_port, filename, is_library?"?is_library=true":"");
|
||||||
|
|
||||||
|
elog(LOG, "curl_easy_perform() url: %s", compute_ctl_url);
|
||||||
|
|
||||||
|
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
|
||||||
|
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
|
||||||
|
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
|
||||||
|
|
||||||
|
|
||||||
|
if (curl)
|
||||||
|
{
|
||||||
|
/* Perform the request, res will get the return code */
|
||||||
|
res = curl_easy_perform(curl);
|
||||||
|
/* Check for errors */
|
||||||
|
if (res == CURLE_OK)
|
||||||
|
{
|
||||||
|
elog(LOG, "curl_easy_perform() succeeded");
|
||||||
|
ret = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
elog(WARNING, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* always cleanup */
|
||||||
|
curl_easy_cleanup(curl);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void pg_init_extension_server()
|
||||||
|
{
|
||||||
|
DefineCustomIntVariable("neon.extension_server_port",
|
||||||
|
"connection string to the compute_ctl",
|
||||||
|
NULL,
|
||||||
|
&extension_server_port,
|
||||||
|
0, 0, INT_MAX,
|
||||||
|
PGC_POSTMASTER,
|
||||||
|
0, /* no flags required */
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
// set download_extension_file_hook
|
||||||
|
prev_download_extension_file_hook = download_extension_file_hook;
|
||||||
|
download_extension_file_hook = neon_download_extension_file_http;
|
||||||
|
}
|
||||||
1
pgxn/neon/extension_server.h
Normal file
1
pgxn/neon/extension_server.h
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
@@ -35,8 +35,11 @@ _PG_init(void)
|
|||||||
{
|
{
|
||||||
pg_init_libpagestore();
|
pg_init_libpagestore();
|
||||||
pg_init_walproposer();
|
pg_init_walproposer();
|
||||||
|
|
||||||
InitControlPlaneConnector();
|
InitControlPlaneConnector();
|
||||||
|
|
||||||
|
pg_init_extension_server();
|
||||||
|
|
||||||
// Important: This must happen after other parts of the extension
|
// Important: This must happen after other parts of the extension
|
||||||
// are loaded, otherwise any settings to GUCs that were set before
|
// are loaded, otherwise any settings to GUCs that were set before
|
||||||
// the extension was loaded will be removed.
|
// the extension was loaded will be removed.
|
||||||
|
|||||||
@@ -21,6 +21,8 @@ extern char *neon_tenant;
|
|||||||
extern void pg_init_libpagestore(void);
|
extern void pg_init_libpagestore(void);
|
||||||
extern void pg_init_walproposer(void);
|
extern void pg_init_walproposer(void);
|
||||||
|
|
||||||
|
extern void pg_init_extension_server(void);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Returns true if we shouldn't do REDO on that block in record indicated by
|
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||||
* block_id; false otherwise.
|
* block_id; false otherwise.
|
||||||
|
|||||||
@@ -534,6 +534,16 @@ class S3Storage:
|
|||||||
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def to_string(self) -> str:
|
||||||
|
return json.dumps(
|
||||||
|
{
|
||||||
|
"bucket": self.bucket_name,
|
||||||
|
"region": self.bucket_region,
|
||||||
|
"endpoint": self.endpoint,
|
||||||
|
"prefix": self.prefix_in_bucket,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
RemoteStorage = Union[LocalFsStorage, S3Storage]
|
||||||
|
|
||||||
@@ -600,10 +610,12 @@ class NeonEnvBuilder:
|
|||||||
self.rust_log_override = rust_log_override
|
self.rust_log_override = rust_log_override
|
||||||
self.port_distributor = port_distributor
|
self.port_distributor = port_distributor
|
||||||
self.remote_storage = remote_storage
|
self.remote_storage = remote_storage
|
||||||
|
self.ext_remote_storage: Optional[S3Storage] = None
|
||||||
|
self.remote_storage_client: Optional[Any] = None
|
||||||
self.remote_storage_users = remote_storage_users
|
self.remote_storage_users = remote_storage_users
|
||||||
self.broker = broker
|
self.broker = broker
|
||||||
self.run_id = run_id
|
self.run_id = run_id
|
||||||
self.mock_s3_server = mock_s3_server
|
self.mock_s3_server: MockS3Server = mock_s3_server
|
||||||
self.pageserver_config_override = pageserver_config_override
|
self.pageserver_config_override = pageserver_config_override
|
||||||
self.num_safekeepers = num_safekeepers
|
self.num_safekeepers = num_safekeepers
|
||||||
self.safekeepers_id_start = safekeepers_id_start
|
self.safekeepers_id_start = safekeepers_id_start
|
||||||
@@ -651,15 +663,24 @@ class NeonEnvBuilder:
|
|||||||
remote_storage_kind: RemoteStorageKind,
|
remote_storage_kind: RemoteStorageKind,
|
||||||
test_name: str,
|
test_name: str,
|
||||||
force_enable: bool = True,
|
force_enable: bool = True,
|
||||||
|
enable_remote_extensions: bool = False,
|
||||||
):
|
):
|
||||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||||
return
|
return
|
||||||
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
self.enable_mock_s3_remote_storage(
|
||||||
|
bucket_name=test_name,
|
||||||
|
force_enable=force_enable,
|
||||||
|
enable_remote_extensions=enable_remote_extensions,
|
||||||
|
)
|
||||||
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
|
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||||
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
|
self.enable_real_s3_remote_storage(
|
||||||
|
test_name=test_name,
|
||||||
|
force_enable=force_enable,
|
||||||
|
enable_remote_extensions=enable_remote_extensions,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
|
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
|
||||||
|
|
||||||
@@ -673,11 +694,15 @@ class NeonEnvBuilder:
|
|||||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||||
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
|
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
|
||||||
|
|
||||||
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
|
def enable_mock_s3_remote_storage(
|
||||||
|
self, bucket_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
|
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
|
||||||
Starts up the mock server, if that does not run yet.
|
Starts up the mock server, if that does not run yet.
|
||||||
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
||||||
|
|
||||||
|
Also creates the bucket for extensions, self.ext_remote_storage bucket
|
||||||
"""
|
"""
|
||||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||||
mock_endpoint = self.mock_s3_server.endpoint()
|
mock_endpoint = self.mock_s3_server.endpoint()
|
||||||
@@ -698,9 +723,22 @@ class NeonEnvBuilder:
|
|||||||
bucket_region=mock_region,
|
bucket_region=mock_region,
|
||||||
access_key=self.mock_s3_server.access_key(),
|
access_key=self.mock_s3_server.access_key(),
|
||||||
secret_key=self.mock_s3_server.secret_key(),
|
secret_key=self.mock_s3_server.secret_key(),
|
||||||
|
prefix_in_bucket="pageserver",
|
||||||
)
|
)
|
||||||
|
|
||||||
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
|
if enable_remote_extensions:
|
||||||
|
self.ext_remote_storage = S3Storage(
|
||||||
|
bucket_name=bucket_name,
|
||||||
|
endpoint=mock_endpoint,
|
||||||
|
bucket_region=mock_region,
|
||||||
|
access_key=self.mock_s3_server.access_key(),
|
||||||
|
secret_key=self.mock_s3_server.secret_key(),
|
||||||
|
prefix_in_bucket="ext",
|
||||||
|
)
|
||||||
|
|
||||||
|
def enable_real_s3_remote_storage(
|
||||||
|
self, test_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Sets up configuration to use real s3 endpoint without mock server
|
Sets up configuration to use real s3 endpoint without mock server
|
||||||
"""
|
"""
|
||||||
@@ -737,9 +775,18 @@ class NeonEnvBuilder:
|
|||||||
bucket_region=region,
|
bucket_region=region,
|
||||||
access_key=access_key,
|
access_key=access_key,
|
||||||
secret_key=secret_key,
|
secret_key=secret_key,
|
||||||
prefix_in_bucket=self.remote_storage_prefix,
|
prefix_in_bucket=f"{self.remote_storage_prefix}/pageserver",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if enable_remote_extensions:
|
||||||
|
self.ext_remote_storage = S3Storage(
|
||||||
|
bucket_name=bucket_name,
|
||||||
|
bucket_region=region,
|
||||||
|
access_key=access_key,
|
||||||
|
secret_key=secret_key,
|
||||||
|
prefix_in_bucket=f"{self.remote_storage_prefix}/ext",
|
||||||
|
)
|
||||||
|
|
||||||
def cleanup_local_storage(self):
|
def cleanup_local_storage(self):
|
||||||
if self.preserve_database_files:
|
if self.preserve_database_files:
|
||||||
return
|
return
|
||||||
@@ -773,6 +820,7 @@ class NeonEnvBuilder:
|
|||||||
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
|
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
|
||||||
# so this line effectively a no-op
|
# so this line effectively a no-op
|
||||||
assert isinstance(self.remote_storage, S3Storage)
|
assert isinstance(self.remote_storage, S3Storage)
|
||||||
|
assert self.remote_storage_client is not None
|
||||||
|
|
||||||
if self.keep_remote_storage_contents:
|
if self.keep_remote_storage_contents:
|
||||||
log.info("keep_remote_storage_contents skipping remote storage cleanup")
|
log.info("keep_remote_storage_contents skipping remote storage cleanup")
|
||||||
@@ -902,6 +950,8 @@ class NeonEnv:
|
|||||||
self.neon_binpath = config.neon_binpath
|
self.neon_binpath = config.neon_binpath
|
||||||
self.pg_distrib_dir = config.pg_distrib_dir
|
self.pg_distrib_dir = config.pg_distrib_dir
|
||||||
self.endpoint_counter = 0
|
self.endpoint_counter = 0
|
||||||
|
self.remote_storage_client = config.remote_storage_client
|
||||||
|
self.ext_remote_storage = config.ext_remote_storage
|
||||||
|
|
||||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||||
# so that we don't need to dig it out of the config file afterwards.
|
# so that we don't need to dig it out of the config file afterwards.
|
||||||
@@ -1488,6 +1538,7 @@ class NeonCli(AbstractNeonCli):
|
|||||||
safekeepers: Optional[List[int]] = None,
|
safekeepers: Optional[List[int]] = None,
|
||||||
tenant_id: Optional[TenantId] = None,
|
tenant_id: Optional[TenantId] = None,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
|
remote_ext_config: Optional[str] = None,
|
||||||
) -> "subprocess.CompletedProcess[str]":
|
) -> "subprocess.CompletedProcess[str]":
|
||||||
args = [
|
args = [
|
||||||
"endpoint",
|
"endpoint",
|
||||||
@@ -1497,6 +1548,8 @@ class NeonCli(AbstractNeonCli):
|
|||||||
"--pg-version",
|
"--pg-version",
|
||||||
self.env.pg_version,
|
self.env.pg_version,
|
||||||
]
|
]
|
||||||
|
if remote_ext_config is not None:
|
||||||
|
args.extend(["--remote-ext-config", remote_ext_config])
|
||||||
if lsn is not None:
|
if lsn is not None:
|
||||||
args.append(f"--lsn={lsn}")
|
args.append(f"--lsn={lsn}")
|
||||||
args.extend(["--pg-port", str(pg_port)])
|
args.extend(["--pg-port", str(pg_port)])
|
||||||
@@ -2358,7 +2411,7 @@ class Endpoint(PgProtocol):
|
|||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def start(self) -> "Endpoint":
|
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
|
||||||
"""
|
"""
|
||||||
Start the Postgres instance.
|
Start the Postgres instance.
|
||||||
Returns self.
|
Returns self.
|
||||||
@@ -2374,6 +2427,7 @@ class Endpoint(PgProtocol):
|
|||||||
http_port=self.http_port,
|
http_port=self.http_port,
|
||||||
tenant_id=self.tenant_id,
|
tenant_id=self.tenant_id,
|
||||||
safekeepers=self.active_safekeepers,
|
safekeepers=self.active_safekeepers,
|
||||||
|
remote_ext_config=remote_ext_config,
|
||||||
)
|
)
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
@@ -2463,6 +2517,7 @@ class Endpoint(PgProtocol):
|
|||||||
hot_standby: bool = False,
|
hot_standby: bool = False,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
|
remote_ext_config: Optional[str] = None,
|
||||||
) -> "Endpoint":
|
) -> "Endpoint":
|
||||||
"""
|
"""
|
||||||
Create an endpoint, apply config, and start Postgres.
|
Create an endpoint, apply config, and start Postgres.
|
||||||
@@ -2477,7 +2532,7 @@ class Endpoint(PgProtocol):
|
|||||||
config_lines=config_lines,
|
config_lines=config_lines,
|
||||||
hot_standby=hot_standby,
|
hot_standby=hot_standby,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
).start()
|
).start(remote_ext_config=remote_ext_config)
|
||||||
|
|
||||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||||
|
|
||||||
@@ -2511,6 +2566,7 @@ class EndpointFactory:
|
|||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
hot_standby: bool = False,
|
hot_standby: bool = False,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
|
remote_ext_config: Optional[str] = None,
|
||||||
) -> Endpoint:
|
) -> Endpoint:
|
||||||
ep = Endpoint(
|
ep = Endpoint(
|
||||||
self.env,
|
self.env,
|
||||||
@@ -2527,6 +2583,7 @@ class EndpointFactory:
|
|||||||
hot_standby=hot_standby,
|
hot_standby=hot_standby,
|
||||||
config_lines=config_lines,
|
config_lines=config_lines,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
|
remote_ext_config=remote_ext_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
|
|||||||
@@ -89,6 +89,9 @@ class TenantId(Id):
|
|||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f'`TenantId("{self.id.hex()}")'
|
return f'`TenantId("{self.id.hex()}")'
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return self.id.hex()
|
||||||
|
|
||||||
|
|
||||||
class TimelineId(Id):
|
class TimelineId(Id):
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
|
|||||||
252
test_runner/regress/test_download_extensions.py
Normal file
252
test_runner/regress/test_download_extensions.py
Normal file
@@ -0,0 +1,252 @@
|
|||||||
|
import os
|
||||||
|
from contextlib import closing
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, RemoteStorageKind
|
||||||
|
from fixtures.pg_version import PgVersion
|
||||||
|
from fixtures.types import TenantId
|
||||||
|
|
||||||
|
NUM_EXT = 3
|
||||||
|
|
||||||
|
|
||||||
|
def control_file_content(owner, i):
|
||||||
|
output = f"""# mock {owner} extension{i}
|
||||||
|
comment = 'This is a mock extension'
|
||||||
|
default_version = '1.0'
|
||||||
|
module_pathname = '$libdir/test_ext{i}'
|
||||||
|
relocatable = true"""
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
def sql_file_content():
|
||||||
|
output = """
|
||||||
|
CREATE FUNCTION test_ext_add(integer, integer) RETURNS integer
|
||||||
|
AS 'select $1 + $2;'
|
||||||
|
LANGUAGE SQL
|
||||||
|
IMMUTABLE
|
||||||
|
RETURNS NULL ON NULL INPUT;
|
||||||
|
"""
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
# Prepare some mock extension files and upload them to the bucket
|
||||||
|
# returns a list of files that should be cleaned up after the test
|
||||||
|
def prepare_mock_ext_storage(
|
||||||
|
pg_version: PgVersion,
|
||||||
|
tenant_id: TenantId,
|
||||||
|
pg_bin: PgBin,
|
||||||
|
ext_remote_storage,
|
||||||
|
remote_storage_client,
|
||||||
|
):
|
||||||
|
bucket_prefix = ext_remote_storage.prefix_in_bucket
|
||||||
|
private_prefix = str(tenant_id)
|
||||||
|
PUB_EXT_ROOT = f"v{pg_version}/share/postgresql/extension"
|
||||||
|
PRIVATE_EXT_ROOT = f"v{pg_version}/{private_prefix}/share/postgresql/extension"
|
||||||
|
LOCAL_EXT_ROOT = f"pg_install/{PUB_EXT_ROOT}"
|
||||||
|
|
||||||
|
PUB_LIB_ROOT = f"v{pg_version}/lib"
|
||||||
|
PRIVATE_LIB_ROOT = f"v{pg_version}/{private_prefix}/lib"
|
||||||
|
LOCAL_LIB_ROOT = f"{pg_bin.pg_lib_dir}/postgresql"
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f"""
|
||||||
|
PUB_EXT_ROOT: {PUB_EXT_ROOT}
|
||||||
|
PRIVATE_EXT_ROOT: {PRIVATE_EXT_ROOT}
|
||||||
|
LOCAL_EXT_ROOT: {LOCAL_EXT_ROOT}
|
||||||
|
PUB_LIB_ROOT: {PUB_LIB_ROOT}
|
||||||
|
PRIVATE_LIB_ROOT: {PRIVATE_LIB_ROOT}
|
||||||
|
LOCAL_LIB_ROOT: {LOCAL_LIB_ROOT}
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
cleanup_files = []
|
||||||
|
|
||||||
|
# Upload several test_ext{i}.control files to the bucket
|
||||||
|
for i in range(NUM_EXT):
|
||||||
|
public_ext = BytesIO(bytes(control_file_content("public", i), "utf-8"))
|
||||||
|
public_remote_name = f"{bucket_prefix}/{PUB_EXT_ROOT}/test_ext{i}.control"
|
||||||
|
public_local_name = f"{LOCAL_EXT_ROOT}/test_ext{i}.control"
|
||||||
|
private_ext = BytesIO(bytes(control_file_content(str(tenant_id), i), "utf-8"))
|
||||||
|
private_remote_name = f"{bucket_prefix}/{PRIVATE_EXT_ROOT}/private_ext{i}.control"
|
||||||
|
private_local_name = f"{LOCAL_EXT_ROOT}/private_ext{i}.control"
|
||||||
|
cleanup_files += [public_local_name, private_local_name]
|
||||||
|
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
public_ext, ext_remote_storage.bucket_name, public_remote_name
|
||||||
|
)
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
private_ext, ext_remote_storage.bucket_name, private_remote_name
|
||||||
|
)
|
||||||
|
|
||||||
|
# Upload SQL file for the extension we're going to create
|
||||||
|
sql_filename = "test_ext0--1.0.sql"
|
||||||
|
test_sql_public_remote_path = f"{bucket_prefix}/{PUB_EXT_ROOT}/{sql_filename}"
|
||||||
|
test_sql_local_path = f"{LOCAL_EXT_ROOT}/{sql_filename}"
|
||||||
|
test_ext_sql_file = BytesIO(bytes(sql_file_content(), "utf-8"))
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
test_ext_sql_file,
|
||||||
|
ext_remote_storage.bucket_name,
|
||||||
|
test_sql_public_remote_path,
|
||||||
|
)
|
||||||
|
cleanup_files += [test_sql_local_path]
|
||||||
|
|
||||||
|
# upload some fake library files
|
||||||
|
for i in range(2):
|
||||||
|
public_library = BytesIO(bytes("\n111\n", "utf-8"))
|
||||||
|
public_remote_name = f"{bucket_prefix}/{PUB_LIB_ROOT}/test_lib{i}.so"
|
||||||
|
public_local_name = f"{LOCAL_LIB_ROOT}/test_lib{i}.so"
|
||||||
|
private_library = BytesIO(bytes("\n111\n", "utf-8"))
|
||||||
|
private_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/private_lib{i}.so"
|
||||||
|
private_local_name = f"{LOCAL_LIB_ROOT}/private_lib{i}.so"
|
||||||
|
|
||||||
|
log.info(f"uploading library to {public_remote_name}")
|
||||||
|
log.info(f"uploading library to {private_remote_name}")
|
||||||
|
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
public_library,
|
||||||
|
ext_remote_storage.bucket_name,
|
||||||
|
public_remote_name,
|
||||||
|
)
|
||||||
|
remote_storage_client.upload_fileobj(
|
||||||
|
private_library,
|
||||||
|
ext_remote_storage.bucket_name,
|
||||||
|
private_remote_name,
|
||||||
|
)
|
||||||
|
cleanup_files += [public_local_name, private_local_name]
|
||||||
|
|
||||||
|
return cleanup_files
|
||||||
|
|
||||||
|
|
||||||
|
# Generate mock extension files and upload them to the bucket.
|
||||||
|
#
|
||||||
|
# Then check that compute nodes can download them and use them
|
||||||
|
# to CREATE EXTENSION and LOAD 'library.so'
|
||||||
|
#
|
||||||
|
# NOTE: You must have appropriate AWS credentials to run REAL_S3 test.
|
||||||
|
# It may also be necessary to set the following environment variables:
|
||||||
|
# export AWS_ACCESS_KEY_ID='test'
|
||||||
|
# export AWS_SECRET_ACCESS_KEY='test'
|
||||||
|
# export AWS_SECURITY_TOKEN='test'
|
||||||
|
# export AWS_SESSION_TOKEN='test'
|
||||||
|
# export AWS_DEFAULT_REGION='us-east-1'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"remote_storage_kind", [RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3]
|
||||||
|
)
|
||||||
|
def test_remote_extensions(
|
||||||
|
neon_env_builder: NeonEnvBuilder,
|
||||||
|
remote_storage_kind: RemoteStorageKind,
|
||||||
|
pg_version: PgVersion,
|
||||||
|
pg_bin: PgBin,
|
||||||
|
):
|
||||||
|
neon_env_builder.enable_remote_storage(
|
||||||
|
remote_storage_kind=remote_storage_kind,
|
||||||
|
test_name="test_remote_extensions",
|
||||||
|
enable_remote_extensions=True,
|
||||||
|
)
|
||||||
|
neon_env_builder.num_safekeepers = 3
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
tenant_id, _ = env.neon_cli.create_tenant()
|
||||||
|
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||||
|
|
||||||
|
assert env.ext_remote_storage is not None
|
||||||
|
assert env.remote_storage_client is not None
|
||||||
|
|
||||||
|
# Prepare some mock extension files and upload them to the bucket
|
||||||
|
cleanup_files = prepare_mock_ext_storage(
|
||||||
|
pg_version,
|
||||||
|
tenant_id,
|
||||||
|
pg_bin,
|
||||||
|
env.ext_remote_storage,
|
||||||
|
env.remote_storage_client,
|
||||||
|
)
|
||||||
|
# Start a compute node and check that it can download the extensions
|
||||||
|
# and use them to CREATE EXTENSION and LOAD 'library.so'
|
||||||
|
#
|
||||||
|
# This block is wrapped in a try/finally so that the downloaded files
|
||||||
|
# are cleaned up even if the test fails
|
||||||
|
try:
|
||||||
|
endpoint = env.endpoints.create_start(
|
||||||
|
"test_remote_extensions",
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||||
|
# config_lines=["log_min_messages=debug3"],
|
||||||
|
)
|
||||||
|
with closing(endpoint.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
# Test query: check that test_ext0 was successfully downloaded
|
||||||
|
cur.execute("SELECT * FROM pg_available_extensions")
|
||||||
|
all_extensions = [x[0] for x in cur.fetchall()]
|
||||||
|
log.info(all_extensions)
|
||||||
|
for i in range(NUM_EXT):
|
||||||
|
assert f"test_ext{i}" in all_extensions
|
||||||
|
assert f"private_ext{i}" in all_extensions
|
||||||
|
|
||||||
|
cur.execute("CREATE EXTENSION test_ext0")
|
||||||
|
cur.execute("SELECT extname FROM pg_extension")
|
||||||
|
all_extensions = [x[0] for x in cur.fetchall()]
|
||||||
|
log.info(all_extensions)
|
||||||
|
assert "test_ext0" in all_extensions
|
||||||
|
|
||||||
|
# Try to load existing library file
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'test_lib0.so'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail with
|
||||||
|
# could not load library ... test_ext.so: file too short
|
||||||
|
# because test_lib0.so is not real library file
|
||||||
|
log.info("LOAD test_lib0.so failed (expectedly): %s", e)
|
||||||
|
assert "file too short" in str(e)
|
||||||
|
|
||||||
|
# Try to load private library file
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'private_lib0.so'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail with
|
||||||
|
# could not load library ... test_ext.so: file too short
|
||||||
|
# because test_lib0.so is not real library file
|
||||||
|
log.info("LOAD private_lib0.so failed (expectedly): %s", e)
|
||||||
|
assert "file too short" in str(e)
|
||||||
|
|
||||||
|
# Try to load existing library file without .so extension
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'test_lib1'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail with
|
||||||
|
# could not load library ... test_lib1.so: file too short
|
||||||
|
# because test_lib1.so is not real library file
|
||||||
|
log.info("LOAD test_lib1 failed (expectedly): %s", e)
|
||||||
|
assert "file too short" in str(e)
|
||||||
|
|
||||||
|
# Try to load non-existent library file
|
||||||
|
try:
|
||||||
|
cur.execute("LOAD 'test_lib_fail.so'")
|
||||||
|
except Exception as e:
|
||||||
|
# expected to fail because test_lib_fail.so is not found
|
||||||
|
log.info("LOAD test_lib_fail.so failed (expectedly): %s", e)
|
||||||
|
assert (
|
||||||
|
"""could not access file "test_lib_fail.so": No such file or directory"""
|
||||||
|
in str(e)
|
||||||
|
)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# this is important because if the files aren't cleaned up then the test can
|
||||||
|
# pass even without successfully downloading the files if a previous run (or
|
||||||
|
# run with different type of remote storage) of the test did download the
|
||||||
|
# files
|
||||||
|
for file in cleanup_files:
|
||||||
|
try:
|
||||||
|
os.remove(file)
|
||||||
|
log.info(f"Deleted {file}")
|
||||||
|
except FileNotFoundError:
|
||||||
|
log.info(f"{file} does not exist, so cannot be deleted")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO
|
||||||
|
# @pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||||
|
# def test_remote_extensions_shared_preload_libraries(
|
||||||
|
# neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, pg_version: PgVersion
|
||||||
|
# ):
|
||||||
@@ -275,6 +275,7 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str]
|
|||||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||||
|
|
||||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||||
|
assert neon_env_builder.remote_storage_client is not None
|
||||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||||
@@ -628,7 +629,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# for some reason the check above doesnt immediately take effect for the below.
|
# for some reason the check above doesnt immediately take effect for the below.
|
||||||
# Assume it is mock server incosistency and check twice.
|
# Assume it is mock server inconsistency and check twice.
|
||||||
wait_until(
|
wait_until(
|
||||||
2,
|
2,
|
||||||
0.5,
|
0.5,
|
||||||
|
|||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: a2daebc6b4...225cda0f1c
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 2df2ce3744...ec7daf4d3d
Reference in New Issue
Block a user