Refactor locking in page cache, and use async I/O for WAL redo

Story on why:

The apply_wal_records() function spawned the special postgres process
to perform WAL redo. That was done in a blocking fashion: it launches
the process, then it writes the command to its stdin, then it reads
the result from its stdout.  I wanted to also read the child process's
stderr, and forward it to the page server's log (which is just the
page server's stderr ATM). That has classic potential for deadlock:
the child process might block trying to write to stderr/stdout, if the
parent isn't reading it. So the parent needs to perform the read/write
with the child's stdin/stdout/stderr in an async fashion.  So I
refactored the code in walredo.c into async style.  But it started to
hang. It took me a while to figure it out; async makes for really ugly
stacktraces, it's hard to figure out what's going on. The call path
goes like this: Page service -> get_page_at_lsn() in page cache ->
apply_wal_records() the page service is written in async style. And I
refactored apply_wal_recorsds() to also be async. BUT,
get_page_at_lsn() acquires a lock, in a blocking fashion.

The lock-up happened like this:

- a GetPage@LSN request arrives. The asynch handler thread calls
  get_page_at_lsn(), which acquires a lock. While holding the lock,
  it calls apply_wal_records().
- apply_wal_records() launches the child process, and waits on it
  using async functions
- more GetPage@LSN requests arrive. They also call get_page_at_lsn().
  But because the lock is already held, they all block

The subsequent GetPage@LSN calls that block waiting on the lock use up
all the async handler threads. All the threads are locked up, so there
is no one left to make progress on the apply_wal_records() call, so it
never releases the lock. Deadlock So my lesson here is that mixing
async and blocking styles is painful. Googling around, this is a well
known problem, there are long philosophical discussions on "what color
is your function".  My plan to fix that is to move the WAL redo into a
separate thread or thread pool, and have the GetPage@LSN handlers
communicate with it using channels.  Having a separate thread pool for
it makes sense anyway in the long run. We'll want to keep the postgres
process around, rather than launch it separately every time we need to
reconstruct a page. Also, when we're not busy reconstructing pages
that are needed right now by GetPage@LSN calls, we want to proactively
apply incoming WAL records from a "backlog".

Solution:

Launch a dedicated thread for WAL redo at startup. It has an event loop,
where it listens on a channel for requests to apply WAL. When a page
server thread needs some WAL to be applied, it sends the request on
the channel, and waits for response. After it's done the WAL redo process
puts the new page image in the page cache, and wakes up the requesting
thread using a condition variable.

This also needed locking changes in the page cache. Each cache entry now
has a reference counter and a dedicated Mutex to protect just the entry.
This commit is contained in:
Heikki Linnakangas
2021-03-22 11:04:23 +02:00
committed by Stas Kelvich
parent 1c1812df05
commit 303a546aba
6 changed files with 387 additions and 149 deletions

11
Cargo.lock generated
View File

@@ -352,6 +352,16 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "crossbeam-channel"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.3"
@@ -1045,6 +1055,7 @@ version = "0.1.0"
dependencies = [
"byteorder",
"bytes",
"crossbeam-channel",
"futures",
"lazy_static",
"log",

View File

@@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
crossbeam-channel = "0.5.0"
rand = "0.8.3"
regex = "1.4.5"
bytes = "1.0.1"

View File

@@ -12,19 +12,25 @@ use pageserver::walreceiver;
use std::io::Error;
fn main() -> Result<(), Error> {
let mut threads = Vec::new();
// Initialize logger
stderrlog::new()
.verbosity(3)
.module("pageserver")
.init().unwrap();
info!("starting...");
// First, restore the latest base backup from S3. (We don't persist anything
// to local disk at the moment, so we need to do this at every startup)
restore_s3::restore_main();
// Initialize the WAL applicator
let walredo_thread = thread::spawn(|| {
walredo::wal_applicator_main();
});
threads.push(walredo_thread);
let mut threads = Vec::new();
// Before opening up for connections, restore the latest base backup from S3.
// (We don't persist anything to local disk at the moment, so we need to do
// this at every startup)
restore_s3::restore_main();
// Launch the WAL receiver thread. It will try to connect to the WAL safekeeper,
// and stream the WAL. If the connection is lost, it will reconnect on its own.
@@ -37,7 +43,6 @@ fn main() -> Result<(), Error> {
// GetPage@LSN requests are served by another thread. (It uses async I/O,
// but the code in page_service sets up it own thread pool for that)
let page_server_thread = thread::spawn(|| {
// thread code
page_service::thread_main();

View File

@@ -1,18 +1,133 @@
//
// Page Cache holds all the different page versions and WAL records
//
//
// The Page Cache is a BTreeMap, keyed by the RelFileNode an blocknumber, and the LSN.
// The BTreeMap is protected by a Mutex, and each cache entry is protected by another
// per-entry mutex.
//
use core::ops::Bound::Included;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use bytes::Bytes;
use lazy_static::lazy_static;
use rand::Rng;
use log::*;
use crate::walredo;
use crossbeam_channel::unbounded;
use crossbeam_channel::{Sender, Receiver};
pub struct PageCache {
shared: Mutex<PageCacheShared>,
// Channel for communicating with the WAL redo process here.
pub walredo_sender: Sender<Arc<CacheEntry>>,
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
}
//
// Shared data structure, holding page cache and related auxiliary information
//
struct PageCacheShared {
// The actual page cache
pagecache: BTreeMap<CacheKey, Arc<CacheEntry>>,
// Relation n_blocks cache
//
// This hashtable should be updated together with the pagecache. Now it is
// accessed unreasonably often through the smgr_nblocks(). It is better to just
// cache it in postgres smgr and ask only on restart.
relsize_cache: HashMap<RelTag, u32>,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
// we receive all the WAL up to the request.
//
first_valid_lsn: u64,
last_valid_lsn: u64
}
lazy_static! {
pub static ref PAGECACHE : PageCache = init_page_cache();
}
fn init_page_cache() -> PageCache
{
// Initialize the channel between the page cache and the WAL applicator
let (s, r) = unbounded();
PageCache {
shared: Mutex::new(
PageCacheShared {
pagecache: BTreeMap::new(),
relsize_cache: HashMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
}),
walredo_sender: s,
walredo_receiver: r,
}
}
//
// We store two kinds of entries in the page cache:
//
// 1. Ready-made images of the block
// 2. WAL records, to be applied on top of the "previous" entry
//
// Some WAL records will initialize the page from scratch. For such records,
// the 'will_init' flag is set. They don't need the previous page image before
// applying. The 'will_init' flag is set for records containing a full-page image,
// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64
}
pub struct CacheEntry {
pub key: CacheKey,
pub content: Mutex<CacheEntryContent>,
// Condition variable used by the WAL redo service, to wake up
// requester.
//
// FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar
// or something else.
pub walredo_condvar: Condvar
}
pub struct CacheEntryContent {
pub page_image: Option<Bytes>,
pub wal_record: Option<WALRecord>,
pub apply_pending: bool,
}
impl CacheEntry {
fn new(key: CacheKey) -> CacheEntry {
CacheEntry {
key: key,
content: Mutex::new(CacheEntryContent {
page_image: None,
wal_record: None,
apply_pending: false,
}),
walredo_condvar: Condvar::new(),
}
}
}
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
pub struct RelTag {
@@ -38,65 +153,6 @@ pub struct WALRecord {
pub rec: Bytes
}
//
// Shared data structure, holding page cache and related auxiliary information
//
struct PageCacheShared {
// The actual page cache
pagecache: BTreeMap<CacheKey, CacheEntry>,
// Relation n_blocks cache
//
// This hashtable should be updated together with the pagecache. Now it is
// accessed unreasonably often through the smgr_nblocks(). It is better to just
// cache it in postgres smgr and ask only on restart.
relsize_cache: HashMap<RelTag, u32>,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
// we receive all the WAL up to the request.
//
first_valid_lsn: u64,
last_valid_lsn: u64
}
lazy_static! {
static ref PAGECACHE: Mutex<PageCacheShared> = Mutex::new(
PageCacheShared {
pagecache: BTreeMap::new(),
relsize_cache: HashMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
});
}
//
// We store two kinds of entries in the page cache:
//
// 1. Ready-made images of the block
// 2. WAL records, to be applied on top of the "previous" entry
//
// Some WAL records will initialize the page from scratch. For such records,
// the 'will_init' flag is set. They don't need the previous page image before
// applying. The 'will_init' flag is set for records containing a full-page image,
// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64
}
#[derive(Clone)]
enum CacheEntry {
PageImage(Bytes),
WALRecord(WALRecord)
}
// Public interface functions
@@ -120,66 +176,133 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
};
let maxkey = CacheKey {
tag: tag,
lsn: lsn + 1
lsn: lsn
};
let shared = PAGECACHE.lock().unwrap();
let entry_rc: Arc<CacheEntry>;
{
let shared = PAGECACHE.shared.lock().unwrap();
if lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {} has already been removed", lsn))?;
}
let pagecache = &shared.pagecache;
let entries = pagecache.range(&minkey .. &maxkey);
let mut records: Vec<WALRecord> = Vec::new();
let mut base_img: Option<Bytes> = None;
for (_key, e) in entries.rev() {
match e {
CacheEntry::PageImage(img) => {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
}
CacheEntry::WALRecord(rec) => {
records.push(rec.clone());
if rec.will_init {
debug!("WAL record at LSN {} initializes the page", rec.lsn);
}
}
if lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {} has already been removed", lsn))?;
}
let pagecache = &shared.pagecache;
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (_key, entry) = entry_opt.unwrap();
entry_rc = entry.clone();
// Now that we have a reference to the cache entry, drop the lock on the map.
// It's important to do this before waiting on the condition variable below,
// and better to do it as soon as possible to maximize concurrency.
}
let page_img: Bytes;
// Lock the cache entry and dig the page image out of it.
let page_img;
{
let mut entry_content = entry_rc.content.lock().unwrap();
if !records.is_empty() {
records.reverse();
if let Some(img) = &entry_content.page_image {
assert!(!entry_content.apply_pending);
page_img = img.clone();
} else if entry_content.wal_record.is_some() {
page_img = walredo::apply_wal_records(tag, base_img, &records)?;
//
// If this page needs to be reconstructed by applying some WAL,
// send a request to the WAL redo thread.
//
if !entry_content.apply_pending {
assert!(!entry_content.apply_pending);
entry_content.apply_pending = true;
debug!("applied {} WAL records to produce page image at LSN {}", records.len(), lsn);
let s = &PAGECACHE.walredo_sender;
s.send(entry_rc.clone())?;
}
// Here, we could put the new page image back to the page cache, to save effort if the
// same (or later) page version is requested again. It's a tradeoff though, as each
// page image consumes some memory
} else if base_img.is_some() {
page_img = base_img.unwrap();
} else {
let zero_page = vec![0 as u8; 8192];
page_img = Bytes::from(zero_page);
/* return Err("could not find page image")?; */
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
page_img = entry_content.page_image.as_ref().unwrap().clone();
} else {
// No base image, and no WAL record. Huh?
return Err(format!("no page image or WAL record for requested page"))?;
}
}
return Ok(page_img);
}
//
// Collect all the WAL records that are needed to reconstruct a page
// image for the given cache entry.
//
// Returns an old page image (if any), and a vector of WAL records to apply
// over it.
//
pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>)
{
// Scan the BTreeMap backwards, starting from the given entry.
let shared = PAGECACHE.shared.lock().unwrap();
let pagecache = &shared.pagecache;
let minkey = CacheKey {
tag: entry.key.tag,
lsn: 0
};
let maxkey = CacheKey {
tag: entry.key.tag,
lsn: entry.key.lsn
};
let entries = pagecache.range((Included(&minkey), Included(&maxkey)));
// the last entry in the range should be the CacheEntry we were given
//let _last_entry = entries.next_back();
//assert!(last_entry == entry);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_key, e) in entries.rev() {
let e = e.content.lock().unwrap();
if let Some(img) = &e.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &e.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
debug!("WAL record at LSN {} initializes the page", rec.lsn);
break;
}
} else {
panic!("no base image and no WAL record on cache entry");
}
}
records.reverse();
return (base_img, records);
}
//
// Adds a WAL record to the page cache
//
@@ -190,10 +313,10 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
lsn: rec.lsn
};
let entry = CacheEntry::WALRecord(rec);
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec);
let mut shared = PAGECACHE.lock().unwrap();
// let pagecache = &mut shared.pagecache;
let mut shared = PAGECACHE.shared.lock().unwrap();
let rel_tag = RelTag {
spcnode: tag.spcnode,
@@ -206,12 +329,12 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
*rel_entry = tag.blknum + 1;
}
let oldentry = shared.pagecache.insert(key, entry);
let oldentry = shared.pagecache.insert(key, Arc::new(entry));
assert!(oldentry.is_none());
}
//
//
// Memorize a full image of a page version
//
pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes)
{
@@ -220,12 +343,13 @@ pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes)
lsn: lsn
};
let entry = CacheEntry::PageImage(img);
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().page_image = Some(img);
let mut shared = PAGECACHE.lock().unwrap();
let mut shared = PAGECACHE.shared.lock().unwrap();
let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, entry);
let oldentry = pagecache.insert(key, Arc::new(entry));
assert!(oldentry.is_none());
debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
@@ -235,7 +359,7 @@ pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes)
//
pub fn advance_last_valid_lsn(lsn: u64)
{
let mut shared = PAGECACHE.lock().unwrap();
let mut shared = PAGECACHE.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
@@ -246,7 +370,7 @@ pub fn advance_last_valid_lsn(lsn: u64)
//
pub fn _advance_first_valid_lsn(lsn: u64)
{
let mut shared = PAGECACHE.lock().unwrap();
let mut shared = PAGECACHE.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.first_valid_lsn);
@@ -260,7 +384,7 @@ pub fn _advance_first_valid_lsn(lsn: u64)
pub fn init_valid_lsn(lsn: u64)
{
let mut shared = PAGECACHE.lock().unwrap();
let mut shared = PAGECACHE.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0);
@@ -271,7 +395,7 @@ pub fn init_valid_lsn(lsn: u64)
pub fn get_last_valid_lsn() -> u64
{
let shared = PAGECACHE.lock().unwrap();
let shared = PAGECACHE.shared.lock().unwrap();
return shared.last_valid_lsn;
}
@@ -294,7 +418,7 @@ pub fn _test_get_page_at_lsn()
let mut tag: Option<BufferTag> = None;
{
let shared = PAGECACHE.lock().unwrap();
let shared = PAGECACHE.shared.lock().unwrap();
let pagecache = &shared.pagecache;
if pagecache.is_empty() {
@@ -332,7 +456,7 @@ pub fn _test_get_page_at_lsn()
// the replica's current replay LSN.
pub fn relsize_inc(rel: &RelTag, to: Option<u32>)
{
let mut shared = PAGECACHE.lock().unwrap();
let mut shared = PAGECACHE.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
if let Some(to) = to {
@@ -344,14 +468,14 @@ pub fn relsize_inc(rel: &RelTag, to: Option<u32>)
pub fn relsize_get(rel: &RelTag) -> u32
{
let mut shared = PAGECACHE.lock().unwrap();
let mut shared = PAGECACHE.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry
}
pub fn relsize_exist(rel: &RelTag) -> bool
{
let shared = PAGECACHE.lock().unwrap();
let shared = PAGECACHE.shared.lock().unwrap();
let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel)
}

View File

@@ -208,7 +208,9 @@ impl FeMessage {
pub fn thread_main() {
// Create a new thread pool
let runtime = runtime::Runtime::new().unwrap();
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap();
let listen_address = "127.0.0.1:5430";
info!("Starting page server on {}", listen_address);

View File

@@ -14,59 +14,154 @@
// TODO: Even though the postgres code runs in a separate process,
// it's not a secure sandbox.
//
use std::process::{Command, Stdio};
use std::io::{Read, Write, Error};
use tokio::runtime::Runtime;
use tokio::process::{Command};
use std::process::Stdio;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncBufReadExt;
use std::io::Error;
use std::assert;
use std::sync::{Arc};
use log::*;
use bytes::{Bytes, BytesMut, BufMut};
use crate::page_cache::BufferTag;
use crate::page_cache::CacheEntry;
use crate::page_cache::WALRecord;
use crate::page_cache;
//
// Main entry point for the WAL applicator thread.
//
pub fn wal_applicator_main()
{
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
// Loop forever, handling requests as they come.
let walredo_channel_receiver = &page_cache::PAGECACHE.walredo_receiver;
loop {
let request = walredo_channel_receiver.recv().unwrap();
handle_apply_request(&runtime, request);
}
}
fn handle_apply_request(runtime: &Runtime, entry_rc: Arc<CacheEntry>)
{
let tag = entry_rc.key.tag;
let (base_img, records) = page_cache::collect_records_for_apply(entry_rc.as_ref());
let mut entry = entry_rc.content.lock().unwrap();
entry.apply_pending = false;
let apply_result = runtime.block_on(apply_wal_records(tag, base_img, records));
if let Err(e) = apply_result {
error!("could not apply WAL records: {}", e);
} else {
entry.page_image = Some(apply_result.unwrap());
}
// Wake up the requester, whether the operation succeeded or not.
entry_rc.walredo_condvar.notify_all();
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
pub fn apply_wal_records(tag: BufferTag, base_img: Option<Bytes>, records: &Vec<WALRecord>) -> Result<Bytes, Error>
async fn apply_wal_records(tag: BufferTag, base_img: Option<Bytes>, records: Vec<WALRecord>) -> Result<Bytes, Error>
{
//
// Start postgres binary in special WAL redo mode.
//
let mut child =
let child =
Command::new("postgres")
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("postgres --wal-redo command failed to start");
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let mut stdin = child.stdin.expect("failed to open child's stdin");
let stderr = child.stderr.expect("failed to open childs' stderr");
let mut stdout = child.stdout.expect("failed to open childs' stdout");
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
stdin.write(&build_begin_redo_for_block_msg(tag))?;
if base_img.is_some() {
stdin.write(&build_push_page_msg(tag, base_img.unwrap()))?;
}
//
// This async block sends all the commands to the process.
//
// For reasons I don't understand, this needs to be a "move" block;
// otherwise the stdin pipe doesn't get closed, despite the shutdown()
// call.
//
let f_stdin = async move {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
stdin.write(&build_begin_redo_for_block_msg(tag)).await?;
if base_img.is_some() {
stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?;
}
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
stdin.write(&build_apply_record_msg(r.lsn, r.rec))?;
}
stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?;
}
debug!("sent {} WAL records to wal redo postgres process", records.len());
// Send GetPage command to get the result back
stdin.write(&build_get_page_msg(tag)).await?;
debug!("sent GetPage");
stdin.flush().await?;
stdin.shutdown().await?;
debug!("stdin finished");
Ok::<(), Error>(())
};
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async move {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
}
if res.unwrap() == 0 {
break;
}
debug!("{}", line.trim());
line.clear();
}
Ok::<(), Error>(())
};
// Read back new page image
stdin.write(&build_get_page_msg(tag))?;
let mut buf = vec![0u8; 8192];
child.stdout.unwrap().read_exact(&mut buf)?;
let f_stdout = async move {
let mut buf = [0u8; 8192];
stdout.read_exact(&mut buf).await?;
debug!("got response");
Ok::<[u8;8192], Error>(buf)
};
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
return Ok(Bytes::from(buf));
let res = futures::try_join!(f_stdout, f_stdin, f_stderr)?;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
}
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes