diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 4cbd769661..5a4c00feff 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1562,7 +1562,7 @@ impl LayeredTimeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> { + fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { //info!("PUT: key {} at {}", key, lsn); let layer = self.get_layer_for_write(lsn)?; layer.put_value(key, lsn, val)?; @@ -1708,23 +1708,8 @@ impl LayeredTimeline { let delta_path = self.create_delta_layer(&frozen_layer)?; layer_paths_to_upload = HashSet::from([delta_path]); } - - // Sync the new layer to disk. - // - // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable - // - // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. - par_fsync::par_fsync(&[ - new_delta_path.clone(), - self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - ])?; fail_point!("checkpoint-before-sync"); - fail_point!("flush-frozen"); - // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { @@ -2531,7 +2516,7 @@ impl Deref for LayeredTimelineWriter<'_> { } impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> { - fn put(&self, key: Key, lsn: Lsn, value: Value) -> Result<()> { + fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> { self.tl.put_value(key, lsn, value) } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index bffb946f7e..87e6877520 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -267,13 +267,13 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> { + pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timelineid, lsn); let mut inner = self.inner.write().unwrap(); inner.assert_writeable(); - let off = inner.file.write_blob(&Value::ser(&val)?)?; + let off = inner.file.write_blob(&Value::ser(val)?)?; let vec_map = inner.index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, off).unwrap().0; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2158e9f644..7a3e9a0f2c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -523,7 +523,6 @@ impl PageServerHandler { base_lsn: Lsn, _end_lsn: Lsn, ) -> anyhow::Result<()> { - thread_mgr::associate_with(Some(tenant_id), Some(timeline_id)); let _enter = info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered(); @@ -573,7 +572,6 @@ impl PageServerHandler { start_lsn: Lsn, end_lsn: Lsn, ) -> anyhow::Result<()> { - thread_mgr::associate_with(Some(tenant_id), Some(timeline_id)); let _enter = info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered(); @@ -777,7 +775,7 @@ impl PageServerHandler { /* Send a tarball of the latest layer on the timeline */ { let mut writer = CopyDataSink { pgb }; - let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?; + let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?; span.record("lsn", &basebackup.lsn.to_string().as_str()); basebackup.send_tarball()?; } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 2cf0c343b9..9717226a7d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -916,7 +916,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { let mut result: Result<()> = Ok(()); self.pending_updates.retain(|&key, value| { if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { - result = writer.put(key, self.lsn, value); + result = writer.put(key, self.lsn, &value); false } else { true @@ -945,7 +945,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { let pending_nblocks = self.pending_nblocks; for (key, value) in self.pending_updates { - writer.put(key, self.lsn, value)?; + writer.put(key, self.lsn, &value)?; } for key_range in self.pending_deletions { writer.delete(key_range.clone(), self.lsn)?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index d25dc8914d..3387cc7a51 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -406,7 +406,7 @@ pub trait TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put(&self, key: Key, lsn: Lsn, value: Value) -> Result<()>; + fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()>; fn delete(&self, key_range: Range, lsn: Lsn) -> Result<()>; diff --git a/test_runner/batch_others/test_import.py b/test_runner/batch_others/test_import.py index 63dc42ee3e..feb28f0c33 100644 --- a/test_runner/batch_others/test_import.py +++ b/test_runner/batch_others/test_import.py @@ -1,5 +1,5 @@ import pytest -from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_upload, wait_for_last_record_lsn +from fixtures.zenith_fixtures import ZenithEnvBuilder, wait_for_upload, wait_for_last_record_lsn from fixtures.utils import lsn_from_hex, lsn_to_hex from uuid import UUID, uuid4 import tarfile @@ -10,14 +10,14 @@ import json from fixtures.utils import subprocess_capture from fixtures.log_helper import log from contextlib import closing -from fixtures.neon_fixtures import pg_distrib_dir +from fixtures.zenith_fixtures import pg_distrib_dir @pytest.mark.timeout(600) -def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): +def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, zenith_env_builder): # Put data in vanilla pg vanilla_pg.start() - vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") + vanilla_pg.safe_psql("create user zenith_admin with password 'postgres' superuser") vanilla_pg.safe_psql('''create table t as select 'long string to consume some space' || g from generate_series(1,300000) g''') assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )] @@ -59,12 +59,12 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build timeline = uuid4() # Set up pageserver for import - neon_env_builder.enable_local_fs_remote_storage() - env = neon_env_builder.init_start() + zenith_env_builder.enable_local_fs_remote_storage() + env = zenith_env_builder.init_start() env.pageserver.http_client().tenant_create(tenant) def import_tar(base, wal): - env.neon_cli.raw_cli([ + env.zenith_cli.raw_cli([ "timeline", "import", "--tenant-id", @@ -102,97 +102,3 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build # Check it worked pg = env.postgres.create_start(node_name, tenant_id=tenant) assert pg.safe_psql('select count(*) from t') == [(300000, )] - - -@pytest.mark.timeout(600) -def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): - - num_rows = 3000 - neon_env_builder.num_safekeepers = 1 - neon_env_builder.enable_local_fs_remote_storage() - env = neon_env_builder.init_start() - - env.neon_cli.create_branch('test_import_from_pageserver') - pgmain = env.postgres.create_start('test_import_from_pageserver') - log.info("postgres is running on 'test_import_from_pageserver' branch") - - timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0] - - with closing(pgmain.connect()) as conn: - with conn.cursor() as cur: - # data loading may take a while, so increase statement timeout - cur.execute("SET statement_timeout='300s'") - cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g - from generate_series(1,{num_rows}) g''') - cur.execute("CHECKPOINT") - - cur.execute('SELECT pg_current_wal_insert_lsn()') - lsn = cur.fetchone()[0] - log.info(f"start_backup_lsn = {lsn}") - - # Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq. - # PgBin sets it automatically, but here we need to pipe psql output to the tar command. - psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')} - - # Get a fullbackup from pageserver - query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}" - cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] - result_basepath = pg_bin.run_capture(cmd, env=psql_env) - tar_output_file = result_basepath + ".stdout" - - # Stop the first pageserver instance, erase all its data - env.postgres.stop_all() - env.pageserver.stop() - - dir_to_clear = Path(env.repo_dir) / 'tenants' - shutil.rmtree(dir_to_clear) - os.mkdir(dir_to_clear) - - #start the pageserver again - env.pageserver.start() - - # Import using another tenantid, because we use the same pageserver. - # TODO Create another pageserver to maeke test more realistic. - tenant = uuid4() - - # Import to pageserver - node_name = "import_from_pageserver" - client = env.pageserver.http_client() - client.tenant_create(tenant) - env.neon_cli.raw_cli([ - "timeline", - "import", - "--tenant-id", - tenant.hex, - "--timeline-id", - timeline, - "--node-name", - node_name, - "--base-lsn", - lsn, - "--base-tarfile", - os.path.join(tar_output_file), - ]) - - # Wait for data to land in s3 - wait_for_last_record_lsn(client, tenant, UUID(timeline), lsn_from_hex(lsn)) - wait_for_upload(client, tenant, UUID(timeline), lsn_from_hex(lsn)) - - # Check it worked - pg = env.postgres.create_start(node_name, tenant_id=tenant) - assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )] - - # Take another fullbackup - query = f"fullbackup { tenant.hex} {timeline} {lsn}" - cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] - result_basepath = pg_bin.run_capture(cmd, env=psql_env) - new_tar_output_file = result_basepath + ".stdout" - - # Check it's the same as the first fullbackup - # TODO pageserver should be checking checksum - assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file) - - # Check that gc works - psconn = env.pageserver.connect() - pscur = psconn.cursor() - pscur.execute(f"do_gc {tenant.hex} {timeline} 0")