mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 16:40:38 +00:00
Add support for remote extensions. When requested, downloads a tar.gz file for the extension and then organizes the contained files. For instance, placing .so files in sharelib.
This commit is contained in:
@@ -5,6 +5,8 @@
|
||||
//! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
|
||||
//! - Every start is a fresh start, so the data directory is removed and
|
||||
//! initialized again on each run.
|
||||
//! - If remote_extension_config is provided, it will be used to fetch extensions list
|
||||
//! and download `shared_preload_libraries` from the remote storage.
|
||||
//! - Next it will put configuration files into the `PGDATA` directory.
|
||||
//! - Sync safekeepers and get commit LSN.
|
||||
//! - Get `basebackup` from pageserver using the returned on the previous step LSN.
|
||||
@@ -27,7 +29,8 @@
|
||||
//! compute_ctl -D /var/db/postgres/compute \
|
||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||
//! -S /var/db/postgres/specs/current.json \
|
||||
//! -b /usr/local/bin/postgres
|
||||
//! -b /usr/local/bin/postgres \
|
||||
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"} \
|
||||
//! ```
|
||||
//!
|
||||
use std::collections::HashMap;
|
||||
@@ -35,7 +38,7 @@ use std::fs::File;
|
||||
use std::panic;
|
||||
use std::path::Path;
|
||||
use std::process::exit;
|
||||
use std::sync::{mpsc, Arc, Condvar, Mutex};
|
||||
use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock};
|
||||
use std::{thread, time::Duration};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
@@ -48,6 +51,8 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::launch_download_extensions;
|
||||
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
@@ -60,10 +65,21 @@ fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT);
|
||||
|
||||
info!("build_tag: {build_tag}");
|
||||
|
||||
let matches = cli().get_matches();
|
||||
let pgbin_default = String::from("postgres");
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||
|
||||
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||
// NOTE TODO: until control-plane changes, we can use the following line to forcibly enable remote extensions
|
||||
// let remote_ext_config = Some(
|
||||
// r#"{"bucket": "neon-dev-extensions", "region": "eu-central-1", "endpoint": null, "prefix": "5555"}"#.to_string(),
|
||||
// );
|
||||
let ext_remote_storage = remote_ext_config.map(|x| {
|
||||
init_remote_storage(x, build_tag)
|
||||
.expect("cannot initialize remote extension storage from config")
|
||||
});
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
@@ -128,9 +144,6 @@ fn main() -> Result<()> {
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
let spec;
|
||||
let mut live_config_allowed = false;
|
||||
match spec_json {
|
||||
@@ -168,6 +181,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let mut new_state = ComputeState::new();
|
||||
let spec_set;
|
||||
|
||||
if let Some(spec) = spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
new_state.pspec = Some(pspec);
|
||||
@@ -179,9 +193,12 @@ fn main() -> Result<()> {
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
pgversion: get_pg_version(pgbin),
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage,
|
||||
available_extensions: OnceLock::new(),
|
||||
};
|
||||
let compute = Arc::new(compute_node);
|
||||
|
||||
@@ -190,6 +207,8 @@ fn main() -> Result<()> {
|
||||
let _http_handle =
|
||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||
|
||||
let extension_server_port: u16 = http_port;
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
@@ -227,10 +246,13 @@ fn main() -> Result<()> {
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
let _download_extensions_handle =
|
||||
launch_download_extensions(&compute).expect("cannot launch download extensions thread");
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
let pg = match compute.start_compute() {
|
||||
let pg = match compute.start_compute(extension_server_port) {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:?}", err);
|
||||
@@ -359,6 +381,12 @@ fn cli() -> clap::Command {
|
||||
.long("control-plane-uri")
|
||||
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("remote-ext-config")
|
||||
.short('r')
|
||||
.long("remote-ext-config")
|
||||
.value_name("REMOTE_EXT_CONFIG"),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
use std::collections::HashSet;
|
||||
use std::fs;
|
||||
use std::io::BufRead;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::sync::{Condvar, Mutex, OnceLock};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::future::join_all;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio;
|
||||
use tokio_postgres;
|
||||
use tracing::{info, instrument, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -18,9 +21,11 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
use crate::config;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
use crate::{config, extension_server};
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
@@ -28,6 +33,7 @@ pub struct ComputeNode {
|
||||
pub connstr: url::Url,
|
||||
pub pgdata: String,
|
||||
pub pgbin: String,
|
||||
pub pgversion: String,
|
||||
/// We should only allow live re- / configuration of the compute node if
|
||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||
/// the latest configuration. Otherwise, there could be a case:
|
||||
@@ -47,6 +53,10 @@ pub struct ComputeNode {
|
||||
pub state: Mutex<ComputeState>,
|
||||
/// `Condvar` to allow notifying waiters about state changes.
|
||||
pub state_changed: Condvar,
|
||||
/// the S3 bucket that we search for extensions in
|
||||
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||
// cached lists of available extensions and libraries
|
||||
pub available_extensions: OnceLock<HashSet<String>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -357,14 +367,22 @@ impl ComputeNode {
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip_all)]
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
pub fn prepare_pgdata(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
extension_server_port: u16,
|
||||
) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Remove/create an empty pgdata directory and put configuration there.
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
||||
config::write_postgres_conf(
|
||||
&pgdata_path.join("postgresql.conf"),
|
||||
&pspec.spec,
|
||||
Some(extension_server_port),
|
||||
)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
// is already connected it will be kicked out, so a secondary (standby)
|
||||
@@ -506,7 +524,7 @@ impl ComputeNode {
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
self.pg_reload_conf(&mut client)?;
|
||||
@@ -536,7 +554,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
@@ -547,7 +565,26 @@ impl ComputeNode {
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
// This part is sync, because we need to download
|
||||
// remote shared_preload_libraries before postgres start (if any)
|
||||
{
|
||||
let library_load_start_time = Utc::now();
|
||||
self.prepare_preload_libraries(&compute_state)?;
|
||||
|
||||
let library_load_time = Utc::now()
|
||||
.signed_duration_since(library_load_start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.load_libraries_ms = library_load_time;
|
||||
info!(
|
||||
"Loading shared_preload_libraries took {:?}ms",
|
||||
library_load_time
|
||||
);
|
||||
}
|
||||
|
||||
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
@@ -695,4 +732,92 @@ LIMIT 100",
|
||||
"{{\"pg_stat_statements\": []}}".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
// If remote extension storage is configured,
|
||||
// download extension control files
|
||||
#[tokio::main]
|
||||
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let custom_ext_prefixes = spec.custom_extensions.clone().unwrap_or(Vec::new());
|
||||
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
|
||||
let available_extensions = extension_server::get_available_extensions(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&custom_ext_prefixes,
|
||||
)
|
||||
.await?;
|
||||
self.available_extensions
|
||||
.set(available_extensions)
|
||||
.expect("available_extensions.set error");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn download_extension(&self, ext_name: &str) -> Result<()> {
|
||||
match &self.ext_remote_storage {
|
||||
None => anyhow::bail!("No remote extension storage"),
|
||||
Some(remote_storage) => {
|
||||
extension_server::download_extension(
|
||||
ext_name,
|
||||
remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn prepare_preload_libraries(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
if self.ext_remote_storage.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
|
||||
info!("parse shared_preload_libraries from spec.cluster.settings");
|
||||
let mut libs_vec = Vec::new();
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
info!("parse shared_preload_libraries from provided postgresql.conf");
|
||||
// that is used in neon_local and python tests
|
||||
if let Some(conf) = &spec.cluster.postgresql_conf {
|
||||
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
|
||||
let mut shared_preload_libraries_line = "";
|
||||
for line in conf_lines {
|
||||
if line.starts_with("shared_preload_libraries") {
|
||||
shared_preload_libraries_line = line;
|
||||
}
|
||||
}
|
||||
let mut preload_libs_vec = Vec::new();
|
||||
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
|
||||
preload_libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
libs_vec.extend(preload_libs_vec);
|
||||
}
|
||||
|
||||
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
|
||||
let mut download_tasks = Vec::new();
|
||||
for library in &libs_vec {
|
||||
download_tasks.push(self.download_extension(library));
|
||||
}
|
||||
let results = join_all(download_tasks).await;
|
||||
for result in results {
|
||||
result?; // propogate any errors
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
||||
}
|
||||
|
||||
/// Create or completely rewrite configuration file specified by `path`
|
||||
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
pub fn write_postgres_conf(
|
||||
path: &Path,
|
||||
spec: &ComputeSpec,
|
||||
extension_server_port: Option<u16>,
|
||||
) -> Result<()> {
|
||||
// File::create() destroys the file content if it exists.
|
||||
let mut file = File::create(path)?;
|
||||
|
||||
@@ -87,5 +91,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
}
|
||||
|
||||
if let Some(port) = extension_server_port {
|
||||
writeln!(file, "neon.extension_server_port={}", port)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -42,13 +42,15 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
pub fn launch_configurator(
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||
let compute = Arc::clone(compute);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
thread::Builder::new()
|
||||
.name("compute-configurator".into())
|
||||
.spawn(move || {
|
||||
configurator_main_loop(&compute);
|
||||
info!("configurator thread is exited");
|
||||
})?)
|
||||
})
|
||||
}
|
||||
|
||||
237
compute_tools/src/extension_server.rs
Normal file
237
compute_tools/src/extension_server.rs
Normal file
@@ -0,0 +1,237 @@
|
||||
// Download extension files from the extension store
|
||||
// and put them in the right place in the postgres directory
|
||||
/*
|
||||
The layout of the S3 bucket is as follows:
|
||||
|
||||
v14/ext_index.json
|
||||
-- this contains information necessary to create control files
|
||||
v14/extensions/test_ext1.tar.gz
|
||||
-- this contains the library files and sql files necessary to create this extension
|
||||
v14/extensions/custom_ext1.tar.gz
|
||||
|
||||
The difference between a private and public extensions is determined by who can
|
||||
load the extension this is specified in ext_index.json
|
||||
|
||||
Speicially, ext_index.json has a list of public extensions, and a list of
|
||||
extensions enabled for specific tenant-ids.
|
||||
*/
|
||||
use crate::compute::ComputeNode;
|
||||
use anyhow::Context;
|
||||
use anyhow::{self, Result};
|
||||
use flate2::read::GzDecoder;
|
||||
use remote_storage::*;
|
||||
use serde_json::{self, Value};
|
||||
use std::collections::HashSet;
|
||||
use std::num::{NonZeroU32, NonZeroUsize};
|
||||
use std::path::Path;
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use tar::Archive;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
|
||||
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
// gives the result of `pg_config [argument]`
|
||||
// where argument is a flag like `--version` or `--sharedir`
|
||||
let pgconfig = pgbin
|
||||
.strip_suffix("postgres")
|
||||
.expect("bad pgbin")
|
||||
.to_owned()
|
||||
+ "/pg_config";
|
||||
let config_output = std::process::Command::new(pgconfig)
|
||||
.arg(argument)
|
||||
.output()
|
||||
.expect("pg_config error");
|
||||
std::str::from_utf8(&config_output.stdout)
|
||||
.expect("pg_config error")
|
||||
.trim()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub fn get_pg_version(pgbin: &str) -> String {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
if human_version.contains("15") {
|
||||
return "v15".to_string();
|
||||
} else if human_version.contains("14") {
|
||||
return "v14".to_string();
|
||||
}
|
||||
panic!("Unsuported postgres version {human_version}");
|
||||
}
|
||||
|
||||
// download extension control files
|
||||
// if custom_ext_prefixes is provided - search also in custom extension paths
|
||||
pub async fn get_available_extensions(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
pg_version: &str,
|
||||
custom_ext_prefixes: &[String],
|
||||
) -> Result<HashSet<String>> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
let index_path = pg_version.to_owned() + "/ext_index.json";
|
||||
let index_path = RemotePath::new(Path::new(&index_path)).context("error forming path")?;
|
||||
info!("download ext_index.json: {:?}", &index_path);
|
||||
|
||||
// TODO: potential optimization: cache ext_index.json
|
||||
let mut download = remote_storage.download(&index_path).await?;
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut write_data_buffer)
|
||||
.await?;
|
||||
let ext_index_str = match str::from_utf8(&write_data_buffer) {
|
||||
Ok(v) => v,
|
||||
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
|
||||
};
|
||||
|
||||
let ext_index_full: Value = serde_json::from_str(ext_index_str)?;
|
||||
let ext_index_full = ext_index_full.as_object().context("error parsing json")?;
|
||||
let control_data = ext_index_full["control_data"]
|
||||
.as_object()
|
||||
.context("json parse error")?;
|
||||
let enabled_extensions = ext_index_full["enabled_extensions"]
|
||||
.as_object()
|
||||
.context("json parse error")?;
|
||||
info!("{:?}", control_data.clone());
|
||||
info!("{:?}", enabled_extensions.clone());
|
||||
|
||||
let mut prefixes = vec!["public".to_string()];
|
||||
prefixes.extend(custom_ext_prefixes.to_owned());
|
||||
info!("{:?}", &prefixes);
|
||||
let mut all_extensions = HashSet::new();
|
||||
for prefix in prefixes {
|
||||
let prefix_extensions = match enabled_extensions.get(&prefix) {
|
||||
Some(Value::Array(ext_name)) => ext_name,
|
||||
_ => {
|
||||
info!("prefix {} has no extensions", prefix);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
info!("{:?}", prefix_extensions);
|
||||
for ext_name in prefix_extensions {
|
||||
all_extensions.insert(ext_name.as_str().context("json parse error")?.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
for prefix in &all_extensions {
|
||||
let control_contents = control_data[prefix].as_str().context("json parse error")?;
|
||||
let control_path = local_sharedir.join(prefix.to_owned() + ".control");
|
||||
|
||||
info!("WRITING FILE {:?}{:?}", control_path, control_contents);
|
||||
std::fs::write(control_path, control_contents)?;
|
||||
}
|
||||
|
||||
Ok(all_extensions.into_iter().collect())
|
||||
}
|
||||
|
||||
// download all sqlfiles (and possibly data files) for a given extension name
|
||||
pub async fn download_extension(
|
||||
ext_name: &str,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
pg_version: &str,
|
||||
) -> Result<()> {
|
||||
// TODO: potential optimization: only download the extension if it doesn't exist
|
||||
// problem: how would we tell if it exists?
|
||||
let ext_name = ext_name.replace(".so", "");
|
||||
let ext_name_targz = ext_name.to_owned() + ".tar.gz";
|
||||
if Path::new(&ext_name_targz).exists() {
|
||||
info!("extension {:?} already exists", ext_name_targz);
|
||||
return Ok(());
|
||||
}
|
||||
let ext_path = RemotePath::new(
|
||||
&Path::new(pg_version)
|
||||
.join("extensions")
|
||||
.join(ext_name_targz.clone()),
|
||||
)?;
|
||||
info!(
|
||||
"Start downloading extension {:?} from {:?}",
|
||||
ext_name, ext_path
|
||||
);
|
||||
let mut download = remote_storage.download(&ext_path).await?;
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut write_data_buffer)
|
||||
.await?;
|
||||
let unzip_dest = pgbin.strip_suffix("/bin/postgres").expect("bad pgbin");
|
||||
let tar = GzDecoder::new(std::io::Cursor::new(write_data_buffer));
|
||||
let mut archive = Archive::new(tar);
|
||||
archive.unpack(unzip_dest)?;
|
||||
info!("Download + unzip {:?} completed successfully", &ext_path);
|
||||
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
let zip_sharedir = format!("{unzip_dest}/extensions/{ext_name}/share/extension");
|
||||
info!("mv {zip_sharedir:?}/* {local_sharedir:?}");
|
||||
for file in std::fs::read_dir(zip_sharedir)? {
|
||||
let old_file = file?.path();
|
||||
let new_file =
|
||||
Path::new(&local_sharedir).join(old_file.file_name().context("error parsing file")?);
|
||||
std::fs::rename(old_file, new_file)?;
|
||||
}
|
||||
let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql");
|
||||
let zip_libdir = format!("{unzip_dest}/extensions/{ext_name}/lib");
|
||||
info!("mv {zip_libdir:?}/* {local_libdir:?}");
|
||||
for file in std::fs::read_dir(zip_libdir)? {
|
||||
let old_file = file?.path();
|
||||
let new_file =
|
||||
Path::new(&local_libdir).join(old_file.file_name().context("error parsing file")?);
|
||||
std::fs::rename(old_file, new_file)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This function initializes the necessary structs to use remmote storage (should be fairly cheap)
|
||||
pub fn init_remote_storage(
|
||||
remote_ext_config: &str,
|
||||
default_prefix: &str,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
|
||||
|
||||
let remote_ext_bucket = remote_ext_config["bucket"]
|
||||
.as_str()
|
||||
.context("config parse error")?;
|
||||
let remote_ext_region = remote_ext_config["region"]
|
||||
.as_str()
|
||||
.context("config parse error")?;
|
||||
let remote_ext_endpoint = remote_ext_config["endpoint"].as_str();
|
||||
let remote_ext_prefix = remote_ext_config["prefix"]
|
||||
.as_str()
|
||||
.unwrap_or(default_prefix)
|
||||
.to_string();
|
||||
|
||||
// TODO: potentially allow modification of other parameters
|
||||
// however, default values should be fine for now
|
||||
let config = S3Config {
|
||||
bucket_name: remote_ext_bucket.to_string(),
|
||||
bucket_region: remote_ext_region.to_string(),
|
||||
prefix_in_bucket: Some(remote_ext_prefix),
|
||||
endpoint: remote_ext_endpoint.map(|x| x.to_string()),
|
||||
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_keys_per_list_response: None,
|
||||
};
|
||||
let config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
|
||||
storage: RemoteStorageKind::AwsS3(config),
|
||||
};
|
||||
GenericRemoteStorage::from_config(&config)
|
||||
}
|
||||
|
||||
pub fn launch_download_extensions(
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||
let compute = Arc::clone(compute);
|
||||
thread::Builder::new()
|
||||
.name("download-extensions".into())
|
||||
.spawn(move || {
|
||||
info!("start download_extension_files");
|
||||
let compute_state = compute.state.lock().expect("error unlocking compute.state");
|
||||
compute
|
||||
.prepare_external_extensions(&compute_state)
|
||||
.expect("error preparing extensions");
|
||||
info!("download_extension_files done, exiting thread");
|
||||
})
|
||||
}
|
||||
@@ -121,6 +121,27 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from S3 on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
info!("req.uri {:?}", req.uri());
|
||||
let filename = route.split('/').last().unwrap().to_string();
|
||||
info!(
|
||||
"serving /extension_server POST request, filename: {:?}",
|
||||
&filename
|
||||
);
|
||||
|
||||
match compute.download_extension(&filename).await {
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
error!("extension download failed: {}", e);
|
||||
let mut resp = Response::new(Body::from(e.to_string()));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return the `404 Not Found` for any other routes.
|
||||
_ => {
|
||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||
|
||||
@@ -139,6 +139,34 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
/extension_server:
|
||||
post:
|
||||
tags:
|
||||
- Extension
|
||||
summary: Download extension from S3 to local folder.
|
||||
description: ""
|
||||
operationId: downloadExtension
|
||||
responses:
|
||||
200:
|
||||
description: Extension downloaded
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
description: Error text or 'OK' if download succeeded.
|
||||
example: "OK"
|
||||
400:
|
||||
description: Request is invalid.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
500:
|
||||
description: Extension download request failed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
|
||||
@@ -9,6 +9,7 @@ pub mod http;
|
||||
#[macro_use]
|
||||
pub mod logger;
|
||||
pub mod compute;
|
||||
pub mod extension_server;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
|
||||
@@ -105,10 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
||||
}
|
||||
|
||||
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||
let state = Arc::clone(state);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
thread::Builder::new()
|
||||
.name("compute-monitor".into())
|
||||
.spawn(move || watch_compute_activity(&state))?)
|
||||
.spawn(move || watch_compute_activity(&state))
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
|
||||
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
||||
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
||||
// always write all config into it creating new file.
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
|
||||
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user