mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-25 22:30:38 +00:00
Compare commits
10 Commits
RemoteExte
...
local_file
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
458ca82d75 | ||
|
|
0dface838d | ||
|
|
f7878c5157 | ||
|
|
53c870b8e7 | ||
|
|
c23b65914e | ||
|
|
b51d3f6b2b | ||
|
|
137472db91 | ||
|
|
f817985f2b | ||
|
|
2e94e1428e | ||
|
|
9a486ca109 |
@@ -324,6 +324,8 @@ impl PostgresNode {
|
|||||||
max_connections = 100\n\
|
max_connections = 100\n\
|
||||||
wal_sender_timeout = 0\n\
|
wal_sender_timeout = 0\n\
|
||||||
wal_level = replica\n\
|
wal_level = replica\n\
|
||||||
|
zenith.file_cache_size = 4096\n\
|
||||||
|
zenith.file_cache_path = '/tmp/file.cache'\n\
|
||||||
listen_addresses = '{address}'\n\
|
listen_addresses = '{address}'\n\
|
||||||
port = {port}\n",
|
port = {port}\n",
|
||||||
address = self.address.ip(),
|
address = self.address.ip(),
|
||||||
|
|||||||
@@ -222,10 +222,6 @@ fn bootstrap_timeline(
|
|||||||
// LSN, and any WAL after that.
|
// LSN, and any WAL after that.
|
||||||
let timeline = repo.create_empty_timeline(tli)?;
|
let timeline = repo.create_empty_timeline(tli)?;
|
||||||
restore_local_repo::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
|
restore_local_repo::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
|
||||||
|
|
||||||
let wal_dir = pgdata_path.join("pg_wal");
|
|
||||||
restore_local_repo::import_timeline_wal(&wal_dir, &*timeline, lsn)?;
|
|
||||||
|
|
||||||
timeline.checkpoint()?;
|
timeline.checkpoint()?;
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
|
|||||||
@@ -11,7 +11,7 @@
|
|||||||
//! parent timeline, and the last LSN that has been written to disk.
|
//! parent timeline, and the last LSN that has been written to disk.
|
||||||
//!
|
//!
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use bookfile::Book;
|
use bookfile::Book;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
@@ -35,7 +35,6 @@ use crate::layered_repository::inmemory_layer::FreezeLayers;
|
|||||||
use crate::relish::*;
|
use crate::relish::*;
|
||||||
use crate::relish_storage::storage_uploader::QueueBasedRelishUploader;
|
use crate::relish_storage::storage_uploader::QueueBasedRelishUploader;
|
||||||
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
||||||
use crate::restore_local_repo::import_timeline_wal;
|
|
||||||
use crate::walredo::WalRedoManager;
|
use crate::walredo::WalRedoManager;
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
use crate::{ZTenantId, ZTimelineId};
|
use crate::{ZTenantId, ZTimelineId};
|
||||||
@@ -253,11 +252,6 @@ impl LayeredRepository {
|
|||||||
timelineid,
|
timelineid,
|
||||||
timeline.get_last_record_lsn()
|
timeline.get_last_record_lsn()
|
||||||
);
|
);
|
||||||
let wal_dir = self
|
|
||||||
.conf
|
|
||||||
.timeline_path(&timelineid, &self.tenantid)
|
|
||||||
.join("wal");
|
|
||||||
import_timeline_wal(&wal_dir, timeline.as_ref(), timeline.get_last_record_lsn())?;
|
|
||||||
|
|
||||||
if cfg!(debug_assertions) {
|
if cfg!(debug_assertions) {
|
||||||
// check again after wal loading
|
// check again after wal loading
|
||||||
@@ -791,47 +785,39 @@ impl Timeline for LayeredTimeline {
|
|||||||
|
|
||||||
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
|
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
|
||||||
|
|
||||||
let oldsize = self
|
if let Some(oldsize) = self.get_relish_size(rel, self.get_last_record_lsn())? {
|
||||||
.get_relish_size(rel, self.get_last_record_lsn())?
|
if oldsize <= relsize {
|
||||||
.ok_or_else(|| {
|
return Ok(());
|
||||||
anyhow!(
|
}
|
||||||
"attempted to truncate non-existent relish {} at {}",
|
let old_last_seg = (oldsize - 1) / RELISH_SEG_SIZE;
|
||||||
|
|
||||||
|
let last_remain_seg = if relsize == 0 {
|
||||||
|
0
|
||||||
|
} else {
|
||||||
|
(relsize - 1) / RELISH_SEG_SIZE
|
||||||
|
};
|
||||||
|
|
||||||
|
// Drop segments beyond the last remaining segment.
|
||||||
|
for remove_segno in (last_remain_seg + 1)..=old_last_seg {
|
||||||
|
let seg = SegmentTag {
|
||||||
rel,
|
rel,
|
||||||
lsn
|
segno: remove_segno,
|
||||||
)
|
};
|
||||||
})?;
|
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
||||||
|
}
|
||||||
|
|
||||||
if oldsize <= relsize {
|
// Truncate the last remaining segment to the specified size
|
||||||
return Ok(());
|
if relsize == 0 || relsize % RELISH_SEG_SIZE != 0 {
|
||||||
|
let seg = SegmentTag {
|
||||||
|
rel,
|
||||||
|
segno: last_remain_seg,
|
||||||
|
};
|
||||||
|
self.perform_write_op(seg, lsn, |layer| {
|
||||||
|
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
self.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
|
||||||
}
|
}
|
||||||
let old_last_seg = (oldsize - 1) / RELISH_SEG_SIZE;
|
|
||||||
|
|
||||||
let last_remain_seg = if relsize == 0 {
|
|
||||||
0
|
|
||||||
} else {
|
|
||||||
(relsize - 1) / RELISH_SEG_SIZE
|
|
||||||
};
|
|
||||||
|
|
||||||
// Drop segments beyond the last remaining segment.
|
|
||||||
for remove_segno in (last_remain_seg + 1)..=old_last_seg {
|
|
||||||
let seg = SegmentTag {
|
|
||||||
rel,
|
|
||||||
segno: remove_segno,
|
|
||||||
};
|
|
||||||
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Truncate the last remaining segment to the specified size
|
|
||||||
if relsize == 0 || relsize % RELISH_SEG_SIZE != 0 {
|
|
||||||
let seg = SegmentTag {
|
|
||||||
rel,
|
|
||||||
segno: last_remain_seg,
|
|
||||||
};
|
|
||||||
self.perform_write_op(seg, lsn, |layer| {
|
|
||||||
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
self.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -32,8 +32,8 @@ pub mod defaults {
|
|||||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||||
// which is good for now to trigger bugs.
|
// which is good for now to trigger bugs.
|
||||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 64 * 1024 * 1024;
|
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(100);
|
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
||||||
|
|||||||
@@ -9,9 +9,7 @@ use std::cmp::min;
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::io::Seek;
|
use std::path::Path;
|
||||||
use std::io::SeekFrom;
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
@@ -283,100 +281,6 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Scan PostgreSQL WAL files in given directory
|
|
||||||
/// and load all records >= 'startpoint' into the repository.
|
|
||||||
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
|
||||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
|
||||||
|
|
||||||
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
|
||||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
|
||||||
let mut last_lsn = startpoint;
|
|
||||||
|
|
||||||
let checkpoint_bytes = timeline.get_page_at_lsn(RelishTag::Checkpoint, 0, startpoint)?;
|
|
||||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// FIXME: assume postgresql tli 1 for now
|
|
||||||
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
|
|
||||||
//Read local file
|
|
||||||
let mut path = walpath.join(&filename);
|
|
||||||
|
|
||||||
// It could be as .partial
|
|
||||||
if !PathBuf::from(&path).exists() {
|
|
||||||
path = walpath.join(filename + ".partial");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Slurp the WAL file
|
|
||||||
let open_result = File::open(&path);
|
|
||||||
if let Err(e) = &open_result {
|
|
||||||
if e.kind() == std::io::ErrorKind::NotFound {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut file = open_result?;
|
|
||||||
|
|
||||||
if offset > 0 {
|
|
||||||
file.seek(SeekFrom::Start(offset as u64))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let nread = file.read_to_end(&mut buf)?;
|
|
||||||
if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize {
|
|
||||||
// Maybe allow this for .partial files?
|
|
||||||
error!("read only {} bytes from WAL file", nread);
|
|
||||||
}
|
|
||||||
|
|
||||||
waldecoder.feed_bytes(&buf);
|
|
||||||
|
|
||||||
let mut nrecords = 0;
|
|
||||||
loop {
|
|
||||||
let rec = waldecoder.poll_decode();
|
|
||||||
if rec.is_err() {
|
|
||||||
// Assume that an error means we've reached the end of
|
|
||||||
// a partial WAL record. So that's ok.
|
|
||||||
trace!("WAL decoder error {:?}", rec);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if let Some((lsn, recdata)) = rec.unwrap() {
|
|
||||||
// The previous record has been handled, let the repository know that
|
|
||||||
// it is up-to-date to this LSN. (We do this here on the "next" iteration,
|
|
||||||
// rather than right after the save_decoded_record, because at the end of
|
|
||||||
// the WAL, we will also need to perform the update of the checkpoint data
|
|
||||||
// with the same LSN as the last actual record.)
|
|
||||||
timeline.advance_last_record_lsn(last_lsn);
|
|
||||||
|
|
||||||
let decoded = decode_wal_record(recdata.clone());
|
|
||||||
save_decoded_record(&mut checkpoint, timeline, &decoded, recdata, lsn)?;
|
|
||||||
last_lsn = lsn;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
nrecords += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("imported {} records up to {}", nrecords, last_lsn);
|
|
||||||
|
|
||||||
segno += 1;
|
|
||||||
offset = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if last_lsn != startpoint {
|
|
||||||
info!(
|
|
||||||
"reached end of WAL at {}, updating checkpoint info",
|
|
||||||
last_lsn
|
|
||||||
);
|
|
||||||
let checkpoint_bytes = checkpoint.encode();
|
|
||||||
timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?;
|
|
||||||
|
|
||||||
timeline.advance_last_record_lsn(last_lsn);
|
|
||||||
} else {
|
|
||||||
info!("no WAL to import at {}", last_lsn);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
|
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
|
||||||
/// relations/pages that the record affects.
|
/// relations/pages that the record affects.
|
||||||
|
|||||||
@@ -23,8 +23,6 @@ use postgres_types::PgLsn;
|
|||||||
use std::cmp::{max, min};
|
use std::cmp::{max, min};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::{File, OpenOptions};
|
|
||||||
use std::io::{Seek, SeekFrom, Write};
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
@@ -192,15 +190,6 @@ fn walreceiver_main(
|
|||||||
let endlsn = startlsn + data.len() as u64;
|
let endlsn = startlsn + data.len() as u64;
|
||||||
let prev_last_rec_lsn = last_rec_lsn;
|
let prev_last_rec_lsn = last_rec_lsn;
|
||||||
|
|
||||||
write_wal_file(
|
|
||||||
conf,
|
|
||||||
startlsn,
|
|
||||||
&timelineid,
|
|
||||||
pg_constants::WAL_SEGMENT_SIZE,
|
|
||||||
data,
|
|
||||||
&tenantid,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
||||||
|
|
||||||
waldecoder.feed_bytes(data);
|
waldecoder.feed_bytes(data);
|
||||||
@@ -403,98 +392,3 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
|
|||||||
Err(IdentifyError.into())
|
Err(IdentifyError.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_wal_file(
|
|
||||||
conf: &PageServerConf,
|
|
||||||
startpos: Lsn,
|
|
||||||
timelineid: &ZTimelineId,
|
|
||||||
wal_seg_size: usize,
|
|
||||||
buf: &[u8],
|
|
||||||
tenantid: &ZTenantId,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut bytes_left: usize = buf.len();
|
|
||||||
let mut bytes_written: usize = 0;
|
|
||||||
let mut partial;
|
|
||||||
let mut start_pos = startpos;
|
|
||||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
|
||||||
|
|
||||||
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
|
|
||||||
|
|
||||||
/* Extract WAL location for this block */
|
|
||||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
|
|
||||||
|
|
||||||
while bytes_left != 0 {
|
|
||||||
let bytes_to_write;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If crossing a WAL boundary, only write up until we reach wal
|
|
||||||
* segment size.
|
|
||||||
*/
|
|
||||||
if xlogoff + bytes_left > wal_seg_size {
|
|
||||||
bytes_to_write = wal_seg_size - xlogoff;
|
|
||||||
} else {
|
|
||||||
bytes_to_write = bytes_left;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Open file */
|
|
||||||
let segno = start_pos.segment_number(wal_seg_size);
|
|
||||||
let wal_file_name = XLogFileName(
|
|
||||||
1, // FIXME: always use Postgres timeline 1
|
|
||||||
segno,
|
|
||||||
wal_seg_size,
|
|
||||||
);
|
|
||||||
let wal_file_path = wal_dir.join(wal_file_name.clone());
|
|
||||||
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut wal_file: File;
|
|
||||||
/* Try to open already completed segment */
|
|
||||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
|
||||||
wal_file = file;
|
|
||||||
partial = false;
|
|
||||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
|
|
||||||
/* Try to open existed partial file */
|
|
||||||
wal_file = file;
|
|
||||||
partial = true;
|
|
||||||
} else {
|
|
||||||
/* Create and fill new partial file */
|
|
||||||
partial = true;
|
|
||||||
match OpenOptions::new()
|
|
||||||
.create(true)
|
|
||||||
.write(true)
|
|
||||||
.open(&wal_file_partial_path)
|
|
||||||
{
|
|
||||||
Ok(mut file) => {
|
|
||||||
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
|
|
||||||
file.write_all(ZERO_BLOCK)?;
|
|
||||||
}
|
|
||||||
wal_file = file;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
|
||||||
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
|
|
||||||
|
|
||||||
// FIXME: Flush the file
|
|
||||||
//wal_file.sync_all()?;
|
|
||||||
}
|
|
||||||
/* Write was successful, advance our position */
|
|
||||||
bytes_written += bytes_to_write;
|
|
||||||
bytes_left -= bytes_to_write;
|
|
||||||
start_pos += bytes_to_write as u64;
|
|
||||||
xlogoff += bytes_to_write;
|
|
||||||
|
|
||||||
/* Did we reach the end of a WAL segment? */
|
|
||||||
if start_pos.segment_offset(wal_seg_size) == 0 {
|
|
||||||
xlogoff = 0;
|
|
||||||
if partial {
|
|
||||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -12,5 +12,5 @@ def test_pgbench(postgres: PostgresFactory, pg_bin, zenith_cli):
|
|||||||
|
|
||||||
connstr = pg.connstr()
|
connstr = pg.connstr()
|
||||||
|
|
||||||
pg_bin.run_capture(['pgbench', '-i', connstr])
|
pg_bin.run_capture(['pgbench', '-i', '-s', '100', connstr])
|
||||||
pg_bin.run_capture(['pgbench'] + '-c 10 -T 5 -P 1 -M prepared'.split() + [connstr])
|
pg_bin.run_capture(['pgbench'] + '-c 1 -N -T 100 -P 1 -M prepared'.split() + [connstr])
|
||||||
|
|||||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 9374fe0963...607255fb7a
Reference in New Issue
Block a user