Compare commits

..

3 Commits

Author SHA1 Message Date
Alek Westover
3d402f39e6 cleaner error propagation in thread creation 2023-07-05 09:56:03 -04:00
Anastasia Lubennikova
7e4b55a933 optimize extension download:
- move extension download to a separate thread;
- add timer around shared preload libraries downloading
2023-07-05 15:04:16 +03:00
Anastasia Lubennikova
681ed9261e fix cleanup of test_remote_extensions 2023-07-04 21:15:54 +03:00
7 changed files with 93 additions and 106 deletions

View File

@@ -51,6 +51,7 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::launch_download_extensions;
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
@@ -243,6 +244,9 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
let _download_extensions_handle =
launch_download_extensions(&compute).expect("cannot launch download extensions thread");
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;

View File

@@ -532,10 +532,25 @@ impl ComputeNode {
pspec.timeline_id,
);
// TODO FIXME: this should not be blocking here
// Maybe we can run it as a child process?
// Also, worth measuring how long this step is taking
self.prepare_external_extensions(&compute_state)?;
// This part is sync, because we need to download
// remote shared_preload_libraries before postgres start (if any)
let library_load_start_time = Utc::now();
{
self.prepare_extenal_libraries(&compute_state)?;
let library_load_time = Utc::now()
.signed_duration_since(library_load_start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let mut state = self.state.lock().unwrap();
state.metrics.load_libraries_ms = library_load_time;
info!(
"Loading shared_preload_libraries took {:?}ms",
library_load_time
);
}
self.prepare_pgdata(&compute_state, extension_server_port)?;
@@ -675,26 +690,23 @@ LIMIT 100",
}
// If remote extension storage is configured,
// download extension control files
// and shared preload libraries.
// download shared preload libraries.
#[tokio::main]
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
pub async fn prepare_extenal_libraries(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
// download preload shared libraries before postgres start (if any)
let spec = &pspec.spec;
// 1. parse custom extension paths from spec
let mut custom_ext_prefixes = match &spec.custom_extensions {
let custom_ext_prefixes = match &spec.custom_extensions {
Some(custom_extensions) => custom_extensions.clone(),
None => Vec::new(),
};
// TODO actually add this to spec
custom_ext_prefixes.push("anon".to_string());
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
// 2. parse shared_preload_libraries from spec
// parse shared_preload_libraries from spec
let mut libs_vec = Vec::new();
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
@@ -740,33 +752,55 @@ LIMIT 100",
}
info!("Libraries to download: {:?}", &libs_vec);
// download extension control files & shared_preload_libraries
let get_extensions_task = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
);
let get_libraries_task = extension_server::get_available_libraries(
// download shared_preload_libraries
let available_libraries = extension_server::get_available_libraries(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
&libs_vec,
);
)
.await?;
let (available_extensions, available_libraries) =
tokio::join!(get_extensions_task, get_libraries_task);
self.available_extensions
.set(available_extensions?)
.expect("available_extensions.set error");
self.available_libraries
.set(available_libraries?)
.set(available_libraries)
.expect("available_libraries.set error");
}
Ok(())
}
// If remote extension storage is configured,
// download extension control files
#[tokio::main]
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
// 1. parse custom extension paths from spec
let custom_ext_prefixes = match &spec.custom_extensions {
Some(custom_extensions) => custom_extensions.clone(),
None => Vec::new(),
};
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
// download extension control files
let available_extensions = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
)
.await?;
self.available_extensions
.set(available_extensions)
.expect("available_extensions.set error");
}
Ok(())
}
pub async fn download_extension_files(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),

View File

@@ -42,13 +42,15 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_configurator(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
})
}

View File

@@ -1,5 +1,6 @@
// Download extension files from the extension store
// and put them in the right place in the postgres directory
use crate::compute::ComputeNode;
use anyhow::{self, bail, Context, Result};
use futures::future::join_all;
use remote_storage::*;
@@ -10,6 +11,8 @@ use std::io::{BufWriter, Write};
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str;
use std::sync::Arc;
use std::thread;
use tokio::io::AsyncReadExt;
use tracing::info;
@@ -426,3 +429,19 @@ async fn organized_extension_files(
}
Ok((grouped_dependencies, control_files))
}
pub fn launch_download_extensions(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {
let compute = Arc::clone(compute);
thread::Builder::new()
.name("download-extensions".into())
.spawn(move || {
info!("start download_extension_files");
let compute_state = compute.state.lock().expect("error unlocking compute.state");
compute
.prepare_external_extensions(&compute_state)
.expect("error preparing extensions");
info!("download_extension_files done, exiting thread");
})
}

View File

@@ -105,10 +105,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_monitor(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>, std::io::Error> {
let state = Arc::clone(state);
Ok(thread::Builder::new()
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state))?)
.spawn(move || watch_compute_activity(&state))
}

View File

@@ -73,6 +73,7 @@ pub struct ComputeMetrics {
pub basebackup_ms: u64,
pub config_ms: u64,
pub total_startup_ms: u64,
pub load_libraries_ms: u64,
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.

View File

@@ -1,4 +1,3 @@
import json
import os
from contextlib import closing
from io import BytesIO
@@ -55,7 +54,7 @@ def prepare_mock_ext_storage(
PUB_EXT_ROOT = f"v{pg_version}/share/extension"
PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/extension"
LOCAL_EXT_ROOT = f"pg_install/{pg_version}/share/postgresql/extension"
LOCAL_EXT_ROOT = f"pg_install/v{pg_version}/share/postgresql/extension"
PUB_LIB_ROOT = f"v{pg_version}/lib"
PRIVATE_LIB_ROOT = f"v{pg_version}/{custom_prefix}/lib"
@@ -249,75 +248,3 @@ def test_remote_extensions(
log.info(f"Deleted {file}")
except FileNotFoundError:
log.info(f"{file} does not exist, so cannot be deleted")
"""
This tests against the actual infra for real S3
Note in particular that we don't need to set up a bucket (real or mock)
because we are testing the files already uploaded as part of CI/CD
"""
def test_remote_extensions_in_bucket(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.REAL_S3,
test_name="test_remote_extensions_in_bucket",
enable_remote_extensions=False, # we don't enable remote extensions here; instead we use the real bucket
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions_in_bucket", tenant_id=tenant_id)
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD 'library.so'
remote_ext_config = {
"bucket": "neon-dev-extensions-us-east-2",
"region": "us-east-2",
"endpoint": None,
"prefix": "5412197734", # build tag
}
endpoint = env.endpoints.create_start(
"test_remote_extensions_in_bucket",
tenant_id=tenant_id,
remote_ext_config=json.dumps(remote_ext_config),
config_lines=["shared_preload_libraries='anon, neon'"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# test create extension
cur.execute("CREATE EXTENSION pg_surgery")
cur.execute("create extension hnsw")
cur.execute("SELECT extname FROM pg_extension")
all_extensions = [x[0] for x in cur.fetchall()]
assert "pg_surgery" in all_extensions
assert "hnsw" in all_extensions
# test load library
cur.execute("LOAD 'hnsw'")
# test load library with .so.3.14 extension.
# Note: it should download the appropriate files, but error for the
# specified reason
try:
cur.execute("LOAD 'libpgtypes'")
except Exception as err:
correct_err_type = (
"Extension libraries are required to use the PG_MODULE_MAGIC macro." in str(err)
)
assert correct_err_type
log.info(err)
# test load extension with dependencies in a subdirectory
try:
cur.execute("CREATE EXTENSION anon")
except Exception as e:
# Check that this errors, but for the right reason
# (that it is missing dependencies, not that files failed to download)
log.info(e)
assert 'required extension "pgcrypto" is not installed' in str(e)
# test load exension with dependencies not in a subdirectory
cur.execute("CREATE EXTENSION fuzzystrmatch")
log.info("Please MANUALLY cleanup any downloaded files")