mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
Resolve conflicts
This commit is contained in:
@@ -1562,7 +1562,7 @@ impl LayeredTimeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
@@ -1708,23 +1708,8 @@ impl LayeredTimeline {
|
||||
let delta_path = self.create_delta_layer(&frozen_layer)?;
|
||||
layer_paths_to_upload = HashSet::from([delta_path]);
|
||||
}
|
||||
|
||||
// Sync the new layer to disk.
|
||||
//
|
||||
// We must also fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
//
|
||||
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
|
||||
// files to flush, it might be better to first write them all, and then fsync
|
||||
// them all in parallel.
|
||||
par_fsync::par_fsync(&[
|
||||
new_delta_path.clone(),
|
||||
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
])?;
|
||||
fail_point!("checkpoint-before-sync");
|
||||
|
||||
fail_point!("flush-frozen");
|
||||
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
@@ -2531,7 +2516,7 @@ impl Deref for LayeredTimelineWriter<'_> {
|
||||
}
|
||||
|
||||
impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> {
|
||||
fn put(&self, key: Key, lsn: Lsn, value: Value) -> Result<()> {
|
||||
fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
}
|
||||
|
||||
|
||||
@@ -267,13 +267,13 @@ impl InMemoryLayer {
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
pub fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timelineid, lsn);
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
|
||||
inner.assert_writeable();
|
||||
|
||||
let off = inner.file.write_blob(&Value::ser(&val)?)?;
|
||||
let off = inner.file.write_blob(&Value::ser(val)?)?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
|
||||
@@ -523,7 +523,6 @@ impl PageServerHandler {
|
||||
base_lsn: Lsn,
|
||||
_end_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
let _enter =
|
||||
info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
|
||||
@@ -573,7 +572,6 @@ impl PageServerHandler {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
let _enter =
|
||||
info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
|
||||
|
||||
@@ -777,7 +775,7 @@ impl PageServerHandler {
|
||||
/* Send a tarball of the latest layer on the timeline */
|
||||
{
|
||||
let mut writer = CopyDataSink { pgb };
|
||||
let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
|
||||
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
|
||||
span.record("lsn", &basebackup.lsn.to_string().as_str());
|
||||
basebackup.send_tarball()?;
|
||||
}
|
||||
|
||||
@@ -916,7 +916,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
let mut result: Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
result = writer.put(key, self.lsn, &value);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
@@ -945,7 +945,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
|
||||
for (key, value) in self.pending_updates {
|
||||
writer.put(key, self.lsn, value)?;
|
||||
writer.put(key, self.lsn, &value)?;
|
||||
}
|
||||
for key_range in self.pending_deletions {
|
||||
writer.delete(key_range.clone(), self.lsn)?;
|
||||
|
||||
@@ -406,7 +406,7 @@ pub trait TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
fn put(&self, key: Key, lsn: Lsn, value: Value) -> Result<()>;
|
||||
fn put(&self, key: Key, lsn: Lsn, value: &Value) -> Result<()>;
|
||||
|
||||
fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> Result<()>;
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_upload, wait_for_last_record_lsn
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder, wait_for_upload, wait_for_last_record_lsn
|
||||
from fixtures.utils import lsn_from_hex, lsn_to_hex
|
||||
from uuid import UUID, uuid4
|
||||
import tarfile
|
||||
@@ -10,14 +10,14 @@ import json
|
||||
from fixtures.utils import subprocess_capture
|
||||
from fixtures.log_helper import log
|
||||
from contextlib import closing
|
||||
from fixtures.neon_fixtures import pg_distrib_dir
|
||||
from fixtures.zenith_fixtures import pg_distrib_dir
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
|
||||
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, zenith_env_builder):
|
||||
# Put data in vanilla pg
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("create user zenith_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql('''create table t as select 'long string to consume some space' || g
|
||||
from generate_series(1,300000) g''')
|
||||
assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )]
|
||||
@@ -59,12 +59,12 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
timeline = uuid4()
|
||||
|
||||
# Set up pageserver for import
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
zenith_env_builder.enable_local_fs_remote_storage()
|
||||
env = zenith_env_builder.init_start()
|
||||
env.pageserver.http_client().tenant_create(tenant)
|
||||
|
||||
def import_tar(base, wal):
|
||||
env.neon_cli.raw_cli([
|
||||
env.zenith_cli.raw_cli([
|
||||
"timeline",
|
||||
"import",
|
||||
"--tenant-id",
|
||||
@@ -102,97 +102,3 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
# Check it worked
|
||||
pg = env.postgres.create_start(node_name, tenant_id=tenant)
|
||||
assert pg.safe_psql('select count(*) from t') == [(300000, )]
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
|
||||
|
||||
num_rows = 3000
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch('test_import_from_pageserver')
|
||||
pgmain = env.postgres.create_start('test_import_from_pageserver')
|
||||
log.info("postgres is running on 'test_import_from_pageserver' branch")
|
||||
|
||||
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
|
||||
with closing(pgmain.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# data loading may take a while, so increase statement timeout
|
||||
cur.execute("SET statement_timeout='300s'")
|
||||
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
|
||||
from generate_series(1,{num_rows}) g''')
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
cur.execute('SELECT pg_current_wal_insert_lsn()')
|
||||
lsn = cur.fetchone()[0]
|
||||
log.info(f"start_backup_lsn = {lsn}")
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
|
||||
|
||||
# Get a fullbackup from pageserver
|
||||
query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||
tar_output_file = result_basepath + ".stdout"
|
||||
|
||||
# Stop the first pageserver instance, erase all its data
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
|
||||
dir_to_clear = Path(env.repo_dir) / 'tenants'
|
||||
shutil.rmtree(dir_to_clear)
|
||||
os.mkdir(dir_to_clear)
|
||||
|
||||
#start the pageserver again
|
||||
env.pageserver.start()
|
||||
|
||||
# Import using another tenantid, because we use the same pageserver.
|
||||
# TODO Create another pageserver to maeke test more realistic.
|
||||
tenant = uuid4()
|
||||
|
||||
# Import to pageserver
|
||||
node_name = "import_from_pageserver"
|
||||
client = env.pageserver.http_client()
|
||||
client.tenant_create(tenant)
|
||||
env.neon_cli.raw_cli([
|
||||
"timeline",
|
||||
"import",
|
||||
"--tenant-id",
|
||||
tenant.hex,
|
||||
"--timeline-id",
|
||||
timeline,
|
||||
"--node-name",
|
||||
node_name,
|
||||
"--base-lsn",
|
||||
lsn,
|
||||
"--base-tarfile",
|
||||
os.path.join(tar_output_file),
|
||||
])
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, UUID(timeline), lsn_from_hex(lsn))
|
||||
wait_for_upload(client, tenant, UUID(timeline), lsn_from_hex(lsn))
|
||||
|
||||
# Check it worked
|
||||
pg = env.postgres.create_start(node_name, tenant_id=tenant)
|
||||
assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )]
|
||||
|
||||
# Take another fullbackup
|
||||
query = f"fullbackup { tenant.hex} {timeline} {lsn}"
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||
new_tar_output_file = result_basepath + ".stdout"
|
||||
|
||||
# Check it's the same as the first fullbackup
|
||||
# TODO pageserver should be checking checksum
|
||||
assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file)
|
||||
|
||||
# Check that gc works
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
pscur.execute(f"do_gc {tenant.hex} {timeline} 0")
|
||||
|
||||
Reference in New Issue
Block a user