mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
cache available libraries: increases efficiency, and reduces code duplication
This commit is contained in:
@@ -49,7 +49,7 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::init_remote_storage;
|
||||
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
@@ -187,10 +187,12 @@ fn main() -> Result<()> {
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
pgversion: get_pg_version(pgbin),
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage,
|
||||
available_libraries: Mutex::new(HashMap::new()),
|
||||
};
|
||||
let compute = Arc::new(compute_node);
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
@@ -17,7 +18,7 @@ use utils::lsn::Lsn;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
@@ -29,6 +30,7 @@ pub struct ComputeNode {
|
||||
pub connstr: url::Url,
|
||||
pub pgdata: String,
|
||||
pub pgbin: String,
|
||||
pub pgversion: String,
|
||||
/// We should only allow live re- / configuration of the compute node if
|
||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||
/// the latest configuration. Otherwise, there could be a case:
|
||||
@@ -50,6 +52,7 @@ pub struct ComputeNode {
|
||||
pub state_changed: Condvar,
|
||||
/// S3 extensions configuration variables
|
||||
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||
pub available_libraries: Mutex<HashMap<String, RemotePath>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -699,10 +702,6 @@ LIMIT 100",
|
||||
// Currently pytest doesn't pass cluster settings to compute_ctl
|
||||
// We need to add this to pytest.
|
||||
// and neon_local pass to spec
|
||||
// libs_vec.push("test_lib1".to_string());
|
||||
// libs_vec.push("private_lib1".to_string());
|
||||
// libs_vec.push("test_lib0".to_string());
|
||||
// libs_vec.push("private_lib0".to_string());
|
||||
// info!(
|
||||
// "shared_preload_libraries extra settings set to {:?}",
|
||||
// libs_vec
|
||||
@@ -713,14 +712,17 @@ LIMIT 100",
|
||||
extension_server::get_available_extensions(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&private_ext_prefixes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("Libraries to download: {:?}", &libs_vec);
|
||||
extension_server::get_available_libraries(
|
||||
let mut available_libraries_lock = self.available_libraries.lock().unwrap();
|
||||
*available_libraries_lock = extension_server::get_available_libraries(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&private_ext_prefixes,
|
||||
&libs_vec,
|
||||
)
|
||||
@@ -745,6 +747,7 @@ LIMIT 100",
|
||||
&filename,
|
||||
remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&private_ext_prefixes,
|
||||
)
|
||||
.await
|
||||
@@ -756,18 +759,12 @@ LIMIT 100",
|
||||
match &self.ext_remote_storage {
|
||||
None => anyhow::bail!("No remote extension storage"),
|
||||
Some(remote_storage) => {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
|
||||
// TODO parse private extension paths from spec instead of tenant_id
|
||||
let tenant_id = pspec.tenant_id.to_string();
|
||||
let private_ext_prefixes: Vec<String> = vec![tenant_id];
|
||||
|
||||
let available_libraries_lock = self.available_libraries.lock().unwrap().clone();
|
||||
extension_server::download_library_file(
|
||||
&filename,
|
||||
remote_storage,
|
||||
&self.pgbin,
|
||||
&private_ext_prefixes,
|
||||
&available_libraries_lock,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn get_pg_version(pgbin: &str) -> String {
|
||||
pub fn get_pg_version(pgbin: &str) -> String {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
@@ -99,21 +99,20 @@ async fn download_helper(
|
||||
pub async fn get_available_extensions(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
pg_version: &str,
|
||||
private_ext_prefixes: &Vec<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
let pg_version = get_pg_version(pgbin);
|
||||
|
||||
let mut paths: Vec<RemotePath> = Vec::new();
|
||||
// public extensions
|
||||
paths.push(RemotePath::new(
|
||||
&Path::new(&pg_version).join("share/postgresql/extension"),
|
||||
&Path::new(pg_version).join("share/postgresql/extension"),
|
||||
)?);
|
||||
|
||||
// private extensions
|
||||
for private_prefix in private_ext_prefixes {
|
||||
paths.push(RemotePath::new(
|
||||
&Path::new(&pg_version)
|
||||
&Path::new(pg_version)
|
||||
.join(private_prefix)
|
||||
.join("share/postgresql/extension"),
|
||||
)?);
|
||||
@@ -145,23 +144,17 @@ pub async fn get_available_extensions(
|
||||
pub async fn get_available_libraries(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
pg_version: &str,
|
||||
private_ext_prefixes: &Vec<String>,
|
||||
preload_libraries: &Vec<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Return early if there are no libraries to download
|
||||
if preload_libraries.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
) -> anyhow::Result<HashMap<String, RemotePath>> {
|
||||
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
|
||||
let pg_version = get_pg_version(pgbin);
|
||||
// Construct a hashmap of all available libraries
|
||||
// example (key, value) pair: test_lib0.so, v14/lib/test_lib0.so
|
||||
|
||||
let mut paths: Vec<RemotePath> = Vec::new();
|
||||
// public libraries
|
||||
paths.push(RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap());
|
||||
|
||||
// private libraries
|
||||
for private_prefix in private_ext_prefixes {
|
||||
paths.push(
|
||||
@@ -176,11 +169,7 @@ pub async fn get_available_libraries(
|
||||
// download all requested libraries
|
||||
for lib_name in preload_libraries {
|
||||
// add file extension if it isn't in the filename
|
||||
let lib_name_with_ext = if !lib_name.ends_with(".so") {
|
||||
lib_name.to_owned() + ".so"
|
||||
} else {
|
||||
lib_name.to_string()
|
||||
};
|
||||
let lib_name_with_ext = enforce_so_end(&lib_name);
|
||||
info!("looking for library {:?}", &lib_name_with_ext);
|
||||
match all_available_libraries.get(&*lib_name_with_ext) {
|
||||
Some(remote_path) => {
|
||||
@@ -189,7 +178,8 @@ pub async fn get_available_libraries(
|
||||
None => bail!("Shared library file {lib_name} is not found in the extension store"),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Ok(all_available_libraries)
|
||||
}
|
||||
|
||||
// download all sqlfiles (and possibly data files) for a given extension name
|
||||
@@ -198,18 +188,16 @@ pub async fn download_extension_sql_files(
|
||||
ext_name: &str,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
pg_version: &str,
|
||||
private_ext_prefixes: &Vec<String>,
|
||||
) -> Result<()> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
|
||||
let pg_version = get_pg_version(pgbin);
|
||||
|
||||
let mut paths: Vec<RemotePath> = Vec::new();
|
||||
// public extensions
|
||||
paths.push(RemotePath::new(
|
||||
&Path::new(&pg_version).join("share/postgresql/extension"),
|
||||
)?);
|
||||
|
||||
// private extensions
|
||||
for private_prefix in private_ext_prefixes {
|
||||
paths.push(RemotePath::new(
|
||||
@@ -271,37 +259,24 @@ pub async fn download_extension_sql_files(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// appends an .so suffix to libname if it does not already have one
|
||||
fn enforce_so_end(libname: &str) -> String {
|
||||
if !libname.ends_with(".so") {
|
||||
format!("{}.so", libname)
|
||||
} else {
|
||||
libname.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
// download shared library file
|
||||
pub async fn download_library_file(
|
||||
lib_name: &str,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
private_ext_prefixes: &Vec<String>,
|
||||
all_available_libraries: &HashMap<String, RemotePath>,
|
||||
) -> Result<()> {
|
||||
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
|
||||
|
||||
let pg_version = get_pg_version(pgbin);
|
||||
|
||||
let mut paths: Vec<RemotePath> = Vec::new();
|
||||
// public libraries
|
||||
paths.push(RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap());
|
||||
|
||||
// private libraries
|
||||
for private_prefix in private_ext_prefixes {
|
||||
paths.push(
|
||||
RemotePath::new(&Path::new(&pg_version).join(private_prefix).join("lib")).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
let all_available_libraries = list_files_in_prefixes(remote_storage, &paths).await?;
|
||||
|
||||
info!("list of library files {:?}", &all_available_libraries);
|
||||
|
||||
let lib_name_with_ext = if !lib_name.ends_with(".so") {
|
||||
lib_name.to_owned() + ".so"
|
||||
} else {
|
||||
lib_name.to_string()
|
||||
};
|
||||
let lib_name_with_ext = enforce_so_end(lib_name);
|
||||
info!("looking for library {:?}", &lib_name_with_ext);
|
||||
match all_available_libraries.get(&*lib_name_with_ext) {
|
||||
Some(remote_path) => {
|
||||
@@ -309,7 +284,6 @@ pub async fn download_library_file(
|
||||
}
|
||||
None => bail!("Shared library file {lib_name} is not found in the extension store"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ def prepare_mock_ext_storage(
|
||||
public_local_name = f"{LOCAL_LIB_ROOT}/test_lib{i}.so"
|
||||
private_library = BytesIO(bytes("\n111\n", "utf-8"))
|
||||
private_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/private_lib{i}.so"
|
||||
private_local_name = f"{LOCAL_EXT_ROOT}/private_lib{i}.so"
|
||||
private_local_name = f"{LOCAL_LIB_ROOT}/private_lib{i}.so"
|
||||
|
||||
log.info(f"uploading library to {public_remote_name}")
|
||||
log.info(f"uploading library to {private_remote_name}")
|
||||
|
||||
Reference in New Issue
Block a user