refactoring

- enable CREATE EXTENSION and LOAD test
- change test_file_download to use mock_s3

- some code cleanup
- add caching of extensions_list

- WIP downloading of shared_preload_libraries (not tested yet)
This commit is contained in:
Anastasia Lubennikova
2023-06-26 15:55:34 +03:00
parent a8f848b5de
commit 1104de0b9b
6 changed files with 162 additions and 81 deletions

View File

@@ -57,6 +57,8 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::extension_server::get_available_extensions;
const BUILD_TAG_DEFAULT: &str = "local";
fn main() -> Result<()> {
@@ -181,6 +183,24 @@ fn main() -> Result<()> {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
tenant_id = Some(pspec.tenant_id);
new_state.pspec = Some(pspec);
// fill in list of available extensions
let rt = Runtime::new().unwrap();
if let Some(ref ext_remote_storage) = ext_remote_storage {
new_state.extensions.available_extensions =
rt.block_on(get_available_extensions(&ext_remote_storage, pgbin, None))?;
}
// append private tenant extensions
// TODO not implemented yet
// let private_ext_list = rt.block_on(get_available_extensions(
// &ext_remote_storage,
// pgbin,
// tenant_id,
// ))?;
// new_state.extensions.available_extensions.extend(private_ext_list);
spec_set = true;
} else {
spec_set = false;
@@ -194,8 +214,6 @@ fn main() -> Result<()> {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
availiable_extensions: Vec::new(),
availiable_libraries: Vec::new(),
};
let compute = Arc::new(compute_node);
@@ -206,17 +224,6 @@ fn main() -> Result<()> {
let extension_server_port: u16 = http_port;
// exen before we have spec, we can get public availiable extensions
// TODO maybe convert get_available_extensions into ComputeNode method as well as other functions
let rt = Runtime::new().unwrap();
rt.block_on(async {
compute
.get_available_extensions(None)
.await
.context("get_avilable_extensions error")
})?;
drop(rt);
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -254,22 +261,6 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// download private tenant extensions before postgres start
// TODO (see Alek's attempt to do this below)
// compute_node.available_extensions = get_available_extensions(ext_remote_storage,pg_version, pgbin,tenant_id);
// if tenant_id.is_some() {
// rt.block_on(async {
// compute
// .get_available_extensions(tenant_id)
// .await
// .context("get_available_extensions with tenant_id error")
// })?;
// }
// download preload shared libraries before postgres start (if any)
// TODO
// download_library_file();
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;

View File

@@ -16,12 +16,15 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use remote_storage::{GenericRemoteStorage, RemotePath};
use remote_storage::GenericRemoteStorage;
use crate::extension_server::get_available_libraries;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::{config, extension_server};
use extension_server::ExtensionsState;
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
// Url type maintains proper escaping
@@ -49,8 +52,6 @@ pub struct ComputeNode {
pub state_changed: Condvar,
/// S3 extensions configuration variables
pub ext_remote_storage: Option<GenericRemoteStorage>,
pub availiable_extensions: Vec<RemotePath>,
pub availiable_libraries: Vec<RemotePath>,
}
#[derive(Clone, Debug)]
@@ -63,6 +64,7 @@ pub struct ComputeState {
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
pub extensions: ExtensionsState,
}
impl ComputeState {
@@ -74,6 +76,7 @@ impl ComputeState {
error: None,
pspec: None,
metrics: ComputeMetrics::default(),
extensions: ExtensionsState::new(),
}
}
}
@@ -435,7 +438,7 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let mut compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
@@ -445,6 +448,35 @@ impl ComputeNode {
pspec.timeline_id,
);
// download preload shared libraries before postgres start (if any)
let spec = &pspec.spec;
let mut libs_vec = Vec::new();
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(',')
.filter(|s| *s != "neon")
.map(str::to_string)
.collect();
}
info!("shared_preload_libraries is set to {:?}", libs_vec);
// download requested shared_preload_libraries and
// fill in list of available libraries
let rt = tokio::runtime::Runtime::new().unwrap();
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let libs = rt.block_on(get_available_libraries(
&ext_remote_storage,
&self.pgbin,
pspec.tenant_id,
&libs_vec,
))?;
compute_state.extensions.available_libraries.extend(libs);
}
self.prepare_pgdata(&compute_state, extension_server_port)?;
let start_time = Utc::now();
@@ -583,12 +615,16 @@ LIMIT 100",
}
pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> {
let state = self.state.lock().unwrap().clone();
let available_extensions = state.extensions.available_extensions;
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_extension_sql_files(
&filename,
&remote_storage,
&available_extensions,
&self.pgbin,
)
.await
@@ -597,20 +633,20 @@ LIMIT 100",
}
pub async fn download_library_file(&self, filename: String) -> Result<()> {
let state = self.state.lock().unwrap().clone();
let available_libraries = state.extensions.available_libraries;
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_library_file(&filename, &remote_storage, &self.pgbin)
.await
extension_server::download_library_file(
&filename,
&available_libraries,
&remote_storage,
&self.pgbin,
)
.await
}
}
}
pub async fn get_available_extensions(&self, tenant_id: Option<TenantId>) -> Result<()> {
if let Some(remote_storage) = self.ext_remote_storage.as_ref() {
extension_server::get_available_extensions(remote_storage, &self.pgbin, tenant_id)
.await?;
}
Ok(())
}
}

View File

@@ -10,6 +10,21 @@ use tokio::io::AsyncReadExt;
use tracing::info;
use utils::id::TenantId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExtensionsState {
pub available_extensions: Vec<RemotePath>,
pub available_libraries: Vec<RemotePath>,
}
impl ExtensionsState {
pub fn new() -> Self {
ExtensionsState {
available_extensions: Vec::new(),
available_libraries: Vec::new(),
}
}
}
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
@@ -84,7 +99,7 @@ pub async fn get_available_extensions(
};
info!(
"get_availiable_extensions remote_sharedir: {:?}, local_sharedir: {:?}",
"get_available_extensions remote_sharedir: {:?}, local_sharedir: {:?}",
remote_sharedir, local_sharedir
);
@@ -100,30 +115,68 @@ pub async fn get_available_extensions(
Ok(from_paths)
}
// Download requested shared_preload_libraries
//
// Note that tenant_id is not optional here, because we only download libraries
// after we know the tenant spec and the tenant_id.
//
// return list of all library files to use it in the future searches
pub async fn get_available_libraries(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
_tenant_id: TenantId,
preload_libraries: &Vec<String>,
) -> anyhow::Result<Vec<RemotePath>> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
let pg_version = get_pg_version(pgbin);
let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap();
let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?;
// TODO list private libraries as well
//
// let remote_libdir_private = RemotePath::new(&Path::new(&pg_version).join(tenant_id.to_string()).join("lib/")).unwrap();
// let available_libraries_private = remote_storage.list_files(Some(&remote_libdir_private)).await?;
// available_libraries.extend(available_libraries_private);
info!("list of library files {:?}", &available_libraries);
// download all requested libraries
for lib_name in preload_libraries {
let lib_path = available_libraries
.iter()
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name);
match lib_path {
None => bail!("Shared library file {lib_name} is not found in the extension store"),
Some(lib_path) => {
download_helper(remote_storage, &lib_path, &local_libdir).await?;
info!("downloaded library {:?}", &lib_path);
}
}
}
return Ok(available_libraries);
}
// download all sql files for a given extension name
//
pub async fn download_extension_sql_files(
ext_name: &str,
//availiable_extensions: &Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
available_extensions: &Vec<RemotePath>,
pgbin: &str,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let pg_version = get_pg_version(pgbin);
let remote_sharedir =
RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension")).unwrap();
// TODO cache availiable_extensions list on the first read to avoid unneeded s3 calls
let availiable_extensions = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!(
"list of availiable_extension files {:?}",
&availiable_extensions
"list of available_extension files {:?}",
&available_extensions
);
// check if extension files exist
let files_to_download: Vec<&RemotePath> = availiable_extensions
let files_to_download: Vec<&RemotePath> = available_extensions
.iter()
.filter(|ext| {
ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name)
@@ -144,22 +197,26 @@ pub async fn download_extension_sql_files(
// download shared library file
pub async fn download_library_file(
lib_name: &str,
// availiable_libraries: &Vec<RemotePath>,
available_libraries: &Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
let pg_version = get_pg_version(pgbin);
let remote_sharedir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap();
let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap();
// TODO cache availiable_libraries list on the first read to avoid unneeded s3 calls
let availiable_libraries = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!(
"cached list of available_libraries files {:?}",
&available_libraries
);
// TODO cache available_libraries list on the first read to avoid unneeded s3 calls
let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?;
info!("list of library files {:?}", &availiable_libraries);
info!("list of library files {:?}", &available_libraries);
// check if the library file exists
let lib = availiable_libraries
let lib = available_libraries
.iter()
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name);

View File

@@ -16,8 +16,6 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
use crate::extension_server::{download_extension_sql_files, download_library_file};
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
start_time: state.start_time,

View File

@@ -311,7 +311,7 @@ impl Endpoint {
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is availiable.
// whichever is available.
let sk_ports = self
.env
.safekeepers

View File

@@ -42,7 +42,7 @@ relocatable = true"""
return output
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.REAL_S3])
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.MOCK_S3])
def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
"""
Tests we can download a file
@@ -66,7 +66,6 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
assert env.ext_remote_storage is not None
assert env.remote_storage_client is not None
NUM_EXT = 5
PUB_EXT_ROOT = "v14/share/postgresql/extension"
BUCKET_PREFIX = "5314225671" # this is the build number
cleanup_files = []
@@ -79,8 +78,8 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
public_remote_name = f"{BUCKET_PREFIX}/{PUB_EXT_ROOT}/test_ext{i}.control"
public_local_name = f"pg_install/{PUB_EXT_ROOT}/test_ext{i}.control"
# private extensions
private_ext = BytesIO(bytes(ext_contents(str(tenant_id), i), "utf-8"))
private_remote_name = f"{BUCKET_PREFIX}/{str(tenant_id)}/private_ext{i}.control"
BytesIO(bytes(ext_contents(str(tenant_id), i), "utf-8"))
f"{BUCKET_PREFIX}/{str(tenant_id)}/private_ext{i}.control"
private_local_name = f"pg_install/{PUB_EXT_ROOT}/private_ext{i}.control"
cleanup_files += [public_local_name, private_local_name]
@@ -158,20 +157,20 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
assert f"test_ext{i}" in all_extensions
# assert f"private_ext{i}" in all_extensions
# cur.execute("CREATE EXTENSION test_ext0")
# cur.execute("SELECT extname FROM pg_extension")
# all_extensions = [x[0] for x in cur.fetchall()]
# log.info(all_extensions)
# assert "test_ext0" in all_extensions
cur.execute("CREATE EXTENSION test_ext0")
cur.execute("SELECT extname FROM pg_extension")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "test_ext0" in all_extensions
# try:
# cur.execute("LOAD 'test_ext0.so'")
# except Exception as e:
# # expected to fail with
# # could not load library ... test_ext.so: file too short
# # because test_ext.so is not real library file
# log.info("LOAD test_ext0.so failed (expectedly): %s", e)
# assert "file too short" in str(e)
try:
cur.execute("LOAD 'test_ext0.so'")
except Exception as e:
# expected to fail with
# could not load library ... test_ext.so: file too short
# because test_ext.so is not real library file
log.info("LOAD test_ext0.so failed (expectedly): %s", e)
assert "file too short" in str(e)
# TODO add more test cases:
# - try to load non-existing library