Compare commits

...

1 Commits

Author SHA1 Message Date
Alexey Kondratov
8a1ed6a1a2 chore(compute): Do a minor cleanup of remote extensions code 2025-01-27 19:03:07 +01:00
2 changed files with 27 additions and 18 deletions

View File

@@ -1486,6 +1486,7 @@ impl ComputeNode {
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.pgbin);
// Second, preload all remote extensions specified in the shared_preload_libraries
let library_load_start_time = Utc::now();
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
@@ -1908,8 +1909,9 @@ LIMIT 100",
.as_ref()
.ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
info!("parse shared_preload_libraries from spec.cluster.settings");
let mut libs_vec = Vec::new();
info!("parse shared_preload_libraries from spec.cluster.settings");
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
@@ -1917,9 +1919,9 @@ LIMIT 100",
.map(str::to_string)
.collect();
}
info!("parse shared_preload_libraries from provided postgresql.conf");
// that is used in neon_local and python tests
// This is used in neon_local and python tests
info!("parse shared_preload_libraries from provided postgresql.conf");
if let Some(conf) = &spec.cluster.postgresql_conf {
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
let mut shared_preload_libraries_line = "";
@@ -1943,7 +1945,10 @@ LIMIT 100",
// Assume that they are already present locally.
libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
info!(
"Downloading extensions specified in shared_preload_libraries: {:?}",
&libs_vec
);
let mut download_tasks = Vec::new();
for library in &libs_vec {

View File

@@ -148,18 +148,18 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
},
_ => {}
}
panic!("Unsuported postgres version {human_version}");
panic!("Unsupported Postgres version {human_version}");
}
// download the archive for a given extension,
// unzip it, and place files in the appropriate locations (share/lib)
/// Download the archive for a given extension,
/// unzip it, and place files in the appropriate locations (share/lib)
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
ext_remote_storage: &str,
pgbin: &str,
) -> Result<u64> {
info!("Download extension {:?} from {:?}", ext_name, ext_path);
info!("Downloading extension {:?} from {:?}", ext_name, ext_path);
// TODO add retry logic
let download_buffer =
@@ -200,23 +200,23 @@ pub async fn download_extension(
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
for paths in [sharedir_paths, libdir_paths] {
let (zip_dir, real_dir) = paths;
info!("mv {zip_dir:?}/* {real_dir:?}");
info!("Moving {zip_dir:?}/* to {real_dir:?}");
for file in std::fs::read_dir(zip_dir)? {
let old_file = file?.path();
let new_file =
Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
info!("moving {old_file:?} to {new_file:?}");
info!("Moving {old_file:?} to {new_file:?}");
// extension download failed: Directory not empty (os error 39)
match std::fs::rename(old_file, new_file) {
Ok(()) => info!("move succeeded"),
Ok(()) => info!("Move succeeded"),
Err(e) => {
warn!("move failed, probably because the extension already exists: {e}")
warn!("Move failed, probably because the extension already exists: {e}")
}
}
}
}
info!("done moving extension {ext_name}");
info!("Done moving extension {ext_name}");
Ok(download_size)
}
@@ -239,10 +239,16 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
for (control_name, control_content) in &ext_data.control_data {
let control_path = local_sharedir.join(control_name);
if !control_path.exists() {
info!("writing file {:?}{:?}", control_path, control_content);
info!(
"Writing control file content {:?}: {:?}",
control_path, control_content
);
std::fs::write(control_path, control_content).unwrap();
} else {
warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
warn!(
"Control file {:?} exists locally. Ignoring the version from the spec.",
control_path
);
}
}
}
@@ -250,9 +256,7 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
// Do request to extension storage proxy, i.e.
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
// using HHTP GET
// and return the response body as bytes
//
// using HTTP GET and return the response body as bytes.
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", ext_remote_storage, ext_path);