mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 07:10:37 +00:00
Compare commits
3 Commits
extension_
...
extension_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d402f39e6 | ||
|
|
7e4b55a933 | ||
|
|
681ed9261e |
@@ -51,6 +51,7 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::launch_download_extensions;
|
||||
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
@@ -243,6 +244,9 @@ fn main() -> Result<()> {
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
let _download_extensions_handle =
|
||||
launch_download_extensions(&compute).expect("cannot launch download extensions thread");
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
|
||||
@@ -532,10 +532,25 @@ impl ComputeNode {
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
// TODO FIXME: this should not be blocking here
|
||||
// Maybe we can run it as a child process?
|
||||
// Also, worth measuring how long this step is taking
|
||||
self.prepare_external_extensions(&compute_state)?;
|
||||
// This part is sync, because we need to download
|
||||
// remote shared_preload_libraries before postgres start (if any)
|
||||
let library_load_start_time = Utc::now();
|
||||
{
|
||||
self.prepare_extenal_libraries(&compute_state)?;
|
||||
|
||||
let library_load_time = Utc::now()
|
||||
.signed_duration_since(library_load_start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.load_libraries_ms = library_load_time;
|
||||
info!(
|
||||
"Loading shared_preload_libraries took {:?}ms",
|
||||
library_load_time
|
||||
);
|
||||
}
|
||||
|
||||
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||
|
||||
@@ -675,26 +690,23 @@ LIMIT 100",
|
||||
}
|
||||
|
||||
// If remote extension storage is configured,
|
||||
// download extension control files
|
||||
// and shared preload libraries.
|
||||
// download shared preload libraries.
|
||||
#[tokio::main]
|
||||
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
pub async fn prepare_extenal_libraries(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
// download preload shared libraries before postgres start (if any)
|
||||
let spec = &pspec.spec;
|
||||
|
||||
// 1. parse custom extension paths from spec
|
||||
let mut custom_ext_prefixes = match &spec.custom_extensions {
|
||||
let custom_ext_prefixes = match &spec.custom_extensions {
|
||||
Some(custom_extensions) => custom_extensions.clone(),
|
||||
None => Vec::new(),
|
||||
};
|
||||
// TODO actually add this to spec
|
||||
custom_ext_prefixes.push("anon".to_string());
|
||||
|
||||
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
|
||||
|
||||
// 2. parse shared_preload_libraries from spec
|
||||
// parse shared_preload_libraries from spec
|
||||
let mut libs_vec = Vec::new();
|
||||
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
@@ -740,33 +752,55 @@ LIMIT 100",
|
||||
}
|
||||
|
||||
info!("Libraries to download: {:?}", &libs_vec);
|
||||
// download extension control files & shared_preload_libraries
|
||||
let get_extensions_task = extension_server::get_available_extensions(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&custom_ext_prefixes,
|
||||
);
|
||||
let get_libraries_task = extension_server::get_available_libraries(
|
||||
// download shared_preload_libraries
|
||||
let available_libraries = extension_server::get_available_libraries(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&custom_ext_prefixes,
|
||||
&libs_vec,
|
||||
);
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (available_extensions, available_libraries) =
|
||||
tokio::join!(get_extensions_task, get_libraries_task);
|
||||
self.available_extensions
|
||||
.set(available_extensions?)
|
||||
.expect("available_extensions.set error");
|
||||
self.available_libraries
|
||||
.set(available_libraries?)
|
||||
.set(available_libraries)
|
||||
.expect("available_libraries.set error");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// If remote extension storage is configured,
|
||||
// download extension control files
|
||||
#[tokio::main]
|
||||
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
|
||||
// 1. parse custom extension paths from spec
|
||||
let custom_ext_prefixes = match &spec.custom_extensions {
|
||||
Some(custom_extensions) => custom_extensions.clone(),
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
|
||||
|
||||
// download extension control files
|
||||
let available_extensions = extension_server::get_available_extensions(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&custom_ext_prefixes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.available_extensions
|
||||
.set(available_extensions)
|
||||
.expect("available_extensions.set error");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn download_extension_files(&self, filename: String) -> Result<()> {
|
||||
match &self.ext_remote_storage {
|
||||
None => anyhow::bail!("No remote extension storage"),
|
||||
|
||||
@@ -42,13 +42,15 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
pub fn launch_configurator(
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||
let compute = Arc::clone(compute);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
thread::Builder::new()
|
||||
.name("compute-configurator".into())
|
||||
.spawn(move || {
|
||||
configurator_main_loop(&compute);
|
||||
info!("configurator thread is exited");
|
||||
})?)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Download extension files from the extension store
|
||||
// and put them in the right place in the postgres directory
|
||||
use crate::compute::ComputeNode;
|
||||
use anyhow::{self, bail, Context, Result};
|
||||
use futures::future::join_all;
|
||||
use remote_storage::*;
|
||||
@@ -10,6 +11,8 @@ use std::io::{BufWriter, Write};
|
||||
use std::num::{NonZeroU32, NonZeroUsize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
|
||||
@@ -426,3 +429,19 @@ async fn organized_extension_files(
|
||||
}
|
||||
Ok((grouped_dependencies, control_files))
|
||||
}
|
||||
|
||||
pub fn launch_download_extensions(
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||
let compute = Arc::clone(compute);
|
||||
thread::Builder::new()
|
||||
.name("download-extensions".into())
|
||||
.spawn(move || {
|
||||
info!("start download_extension_files");
|
||||
let compute_state = compute.state.lock().expect("error unlocking compute.state");
|
||||
compute
|
||||
.prepare_external_extensions(&compute_state)
|
||||
.expect("error preparing extensions");
|
||||
info!("download_extension_files done, exiting thread");
|
||||
})
|
||||
}
|
||||
|
||||
@@ -105,10 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
||||
}
|
||||
|
||||
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>, std::io::Error> {
|
||||
let state = Arc::clone(state);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
thread::Builder::new()
|
||||
.name("compute-monitor".into())
|
||||
.spawn(move || watch_compute_activity(&state))?)
|
||||
.spawn(move || watch_compute_activity(&state))
|
||||
}
|
||||
|
||||
@@ -73,6 +73,7 @@ pub struct ComputeMetrics {
|
||||
pub basebackup_ms: u64,
|
||||
pub config_ms: u64,
|
||||
pub total_startup_ms: u64,
|
||||
pub load_libraries_ms: u64,
|
||||
}
|
||||
|
||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import json
|
||||
import os
|
||||
from contextlib import closing
|
||||
from io import BytesIO
|
||||
@@ -55,7 +54,7 @@ def prepare_mock_ext_storage(
|
||||
|
||||
PUB_EXT_ROOT = f"v{pg_version}/share/extension"
|
||||
PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/extension"
|
||||
LOCAL_EXT_ROOT = f"pg_install/{pg_version}/share/postgresql/extension"
|
||||
LOCAL_EXT_ROOT = f"pg_install/v{pg_version}/share/postgresql/extension"
|
||||
|
||||
PUB_LIB_ROOT = f"v{pg_version}/lib"
|
||||
PRIVATE_LIB_ROOT = f"v{pg_version}/{custom_prefix}/lib"
|
||||
@@ -249,75 +248,3 @@ def test_remote_extensions(
|
||||
log.info(f"Deleted {file}")
|
||||
except FileNotFoundError:
|
||||
log.info(f"{file} does not exist, so cannot be deleted")
|
||||
|
||||
|
||||
"""
|
||||
This tests against the actual infra for real S3
|
||||
Note in particular that we don't need to set up a bucket (real or mock)
|
||||
because we are testing the files already uploaded as part of CI/CD
|
||||
"""
|
||||
|
||||
|
||||
def test_remote_extensions_in_bucket(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.REAL_S3,
|
||||
test_name="test_remote_extensions_in_bucket",
|
||||
enable_remote_extensions=False, # we don't enable remote extensions here; instead we use the real bucket
|
||||
)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions_in_bucket", tenant_id=tenant_id)
|
||||
|
||||
# Start a compute node and check that it can download the extensions
|
||||
# and use them to CREATE EXTENSION and LOAD 'library.so'
|
||||
remote_ext_config = {
|
||||
"bucket": "neon-dev-extensions-us-east-2",
|
||||
"region": "us-east-2",
|
||||
"endpoint": None,
|
||||
"prefix": "5412197734", # build tag
|
||||
}
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_extensions_in_bucket",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=json.dumps(remote_ext_config),
|
||||
config_lines=["shared_preload_libraries='anon, neon'"],
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# test create extension
|
||||
cur.execute("CREATE EXTENSION pg_surgery")
|
||||
cur.execute("create extension hnsw")
|
||||
cur.execute("SELECT extname FROM pg_extension")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
assert "pg_surgery" in all_extensions
|
||||
assert "hnsw" in all_extensions
|
||||
|
||||
# test load library
|
||||
cur.execute("LOAD 'hnsw'")
|
||||
|
||||
# test load library with .so.3.14 extension.
|
||||
# Note: it should download the appropriate files, but error for the
|
||||
# specified reason
|
||||
try:
|
||||
cur.execute("LOAD 'libpgtypes'")
|
||||
except Exception as err:
|
||||
correct_err_type = (
|
||||
"Extension libraries are required to use the PG_MODULE_MAGIC macro." in str(err)
|
||||
)
|
||||
assert correct_err_type
|
||||
log.info(err)
|
||||
|
||||
# test load extension with dependencies in a subdirectory
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
except Exception as e:
|
||||
# Check that this errors, but for the right reason
|
||||
# (that it is missing dependencies, not that files failed to download)
|
||||
log.info(e)
|
||||
assert 'required extension "pgcrypto" is not installed' in str(e)
|
||||
|
||||
# test load exension with dependencies not in a subdirectory
|
||||
cur.execute("CREATE EXTENSION fuzzystrmatch")
|
||||
|
||||
log.info("Please MANUALLY cleanup any downloaded files")
|
||||
|
||||
Reference in New Issue
Block a user