feat(mito): Port parquet writer and reader to mito2 (#2018)

* feat(mito): Port Batch and BufferedWriter

* feat: encode metadata to parquet

* feat: define BatchReader trait

* chore: ParquetWriter write_all takes `&mut self`

* feat(mito): port ParquetReader

* chore: fix typo

* chore: address CR comment
This commit is contained in:
Yingwen
2023-07-24 18:35:21 +09:00
committed by GitHub
parent 1f371f5e6e
commit 5f65e3ff44
13 changed files with 684 additions and 6 deletions

2
Cargo.lock generated
View File

@@ -5511,6 +5511,7 @@ dependencies = [
"anymap",
"aquamarine",
"arc-swap",
"async-compat",
"async-stream",
"async-trait",
"chrono",
@@ -5536,6 +5537,7 @@ dependencies = [
"log-store",
"metrics",
"object-store",
"parquet",
"regex",
"serde",
"serde_json",

View File

@@ -12,6 +12,7 @@ test = ["common-test-util"]
aquamarine = "0.3"
anymap = "1.0.0-beta.2"
arc-swap = "1.0"
async-compat = "0.2"
async-stream.workspace = true
async-trait = "0.1"
chrono.workspace = true
@@ -36,6 +37,7 @@ lazy_static = "1.4"
log-store = { path = "../log-store" }
metrics.workspace = true
object-store = { path = "../object-store" }
parquet = { workspace = true, features = ["async"] }
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_datasource::compression::CompressionType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use datatypes::arrow::error::ArrowError;
use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;
@@ -117,6 +118,50 @@ pub enum Error {
region_id: RegionId,
location: Location,
},
#[snafu(display(
"Failed to create RecordBatch from vectors, location: {}, source: {}",
location,
source
))]
NewRecordBatch {
location: Location,
source: ArrowError,
},
#[snafu(display(
"Failed to write to buffer, location: {}, source: {}",
location,
source
))]
WriteBuffer {
location: Location,
source: common_datasource::error::Error,
},
#[snafu(display(
"Failed to write parquet file, path: {}, location: {}, source: {}",
path,
location,
source
))]
WriteParquet {
path: String,
location: Location,
source: parquet::errors::ParquetError,
},
#[snafu(display(
"Failed to read parquet file, path: {}, location: {}, source: {}",
path,
location,
source
))]
ReadParquet {
path: String,
source: parquet::errors::ParquetError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -126,12 +171,15 @@ impl ErrorExt for Error {
use Error::*;
match self {
OpenDal { .. } => StatusCode::StorageUnavailable,
OpenDal { .. } | WriteParquet { .. } | ReadParquet { .. } => {
StatusCode::StorageUnavailable
}
CompressObject { .. }
| DecompressObject { .. }
| SerdeJson { .. }
| Utf8 { .. }
| RegionExists { .. } => StatusCode::Unexpected,
| RegionExists { .. }
| NewRecordBatch { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. }
| InitialMetadata { .. }
| InvalidMeta { .. }
@@ -139,6 +187,7 @@ impl ErrorExt for Error {
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}
WriteBuffer { source, .. } => source.status_code(),
}
}

View File

@@ -31,10 +31,11 @@ pub mod manifest;
pub mod memtable;
#[allow(dead_code)]
pub mod metadata;
pub mod read;
#[allow(dead_code)]
mod region;
#[allow(dead_code)]
pub(crate) mod sst;
pub mod sst;
#[allow(dead_code)]
mod worker;

View File

@@ -24,7 +24,7 @@ use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, RegionId};
use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result};
use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result, SerdeJsonSnafu};
use crate::region::VersionNumber;
/// Initial version number of a new region.
@@ -114,6 +114,13 @@ impl<'de> Deserialize<'de> for RegionMetadata {
}
}
impl RegionMetadata {
/// Encode the metadata to a JSON string.
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).context(SerdeJsonSnafu)
}
}
/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.

140
src/mito2/src/read.rs Normal file
View File

@@ -0,0 +1,140 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Common structs and utilities for reading data.
use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::vectors::VectorRef;
use crate::error::Result;
use crate::metadata::RegionMetadataRef;
/// Storage internal representation of a batch of rows.
///
/// Now the structure of [Batch] is still unstable, all pub fields may be changed.
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct Batch {
/// Rows organized in columnar format.
pub columns: Vec<VectorRef>,
}
impl Batch {
/// Create a new `Batch` from `columns`.
///
/// # Panics
/// Panics if vectors in `columns` have different length.
pub fn new(columns: Vec<VectorRef>) -> Batch {
Self::assert_columns(&columns);
Batch { columns }
}
/// Returns number of columns in the batch.
pub fn num_columns(&self) -> usize {
self.columns.len()
}
/// Returns number of rows in the batch.
pub fn num_rows(&self) -> usize {
self.columns.get(0).map(|v| v.len()).unwrap_or(0)
}
/// Returns true if the number of rows in the batch is 0.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
/// Slice the batch, returning a new batch.
///
/// # Panics
/// Panics if `offset + length > self.num_rows()`.
pub fn slice(&self, offset: usize, length: usize) -> Batch {
let columns = self
.columns
.iter()
.map(|v| v.slice(offset, length))
.collect();
Batch { columns }
}
fn assert_columns(columns: &[VectorRef]) {
if columns.is_empty() {
return;
}
let length = columns[0].len();
assert!(columns.iter().all(|col| col.len() == length));
}
}
/// Collected [Source] statistics.
#[derive(Debug, Clone)]
pub struct SourceStats {
/// Number of rows fetched.
pub num_rows: usize,
/// Min timestamp from fetched batches.
///
/// If no rows fetched, the value of the timestamp is i64::MIN.
pub min_timestamp: Timestamp,
/// Max timestamp from fetched batches.
///
/// If no rows fetched, the value of the timestamp is i64::MAX.
pub max_timestamp: Timestamp,
}
/// Async [Batch] reader and iterator wrapper.
///
/// This is the data source for SST writers or internal readers.
pub enum Source {}
impl Source {
/// Returns next [Batch] from this data source.
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
unimplemented!()
}
/// Returns the metadata of the source region.
pub(crate) fn metadata(&self) -> RegionMetadataRef {
unimplemented!()
}
/// Returns statisics of fetched batches.
pub(crate) fn stats(&self) -> SourceStats {
unimplemented!()
}
}
/// Async batch reader.
#[async_trait]
pub trait BatchReader: Send {
/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
/// again won't return batch again.
///
/// If `Err` is returned, caller should not call this method again, the implementor
/// may or may not panic in such case.
async fn next_batch(&mut self) -> Result<Option<Batch>>;
}
/// Pointer to [BatchReader].
pub type BoxedBatchReader = Box<dyn BatchReader>;
#[async_trait::async_trait]
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
(**self).next_batch().await
}
}

View File

@@ -15,4 +15,6 @@
//! Sorted strings tables.
pub mod file;
pub mod parquet;
mod stream_writer;
pub(crate) mod version;

View File

@@ -20,6 +20,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use common_time::Timestamp;
use object_store::util::join_path;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
@@ -52,7 +53,7 @@ impl FileId {
/// Append `.parquet` to file id to make a complete file name
pub fn as_parquet(&self) -> String {
format!("{}{}", self.0.hyphenated(), ".parquet")
format!("{}{}", self, ".parquet")
}
}
@@ -109,6 +110,18 @@ impl fmt::Debug for FileHandle {
}
}
impl FileHandle {
/// Returns the file id.
pub fn file_id(&self) -> FileId {
self.inner.meta.file_id
}
/// Returns the complete file path of the file.
pub fn file_path(&self, file_dir: &str) -> String {
join_path(file_dir, &self.file_id().as_parquet())
}
}
/// Inner data of [FileHandle].
///
/// Contains meta of the file, and other mutable info like whether the file is compacting.

View File

@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! SST in parquet format.
mod reader;
mod writer;
use common_base::readable_size::ReadableSize;
use crate::sst::file::FileTimeRange;
/// Key of metadata in parquet SST.
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
/// Parquet write options.
#[derive(Debug)]
pub struct WriteOptions {
/// Buffer size for async writer.
pub write_buffer_size: ReadableSize,
/// Row group size.
pub row_group_size: usize,
}
/// Parquet SST info returned by the writer.
pub struct SstInfo {
/// Time range of the SST.
pub time_range: FileTimeRange,
/// File size in bytes.
pub file_size: u64,
/// Number of rows.
pub num_rows: usize,
}

View File

@@ -0,0 +1,171 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Parquet reader.
use async_compat::CompatExt;
use async_trait::async_trait;
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use object_store::ObjectStore;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::errors::ParquetError;
use snafu::ResultExt;
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
/// Parquet SST reader builder.
pub struct ParquetReaderBuilder {
file_dir: String,
file_handle: FileHandle,
object_store: ObjectStore,
predicate: Option<Predicate>,
time_range: Option<TimestampRange>,
}
impl ParquetReaderBuilder {
/// Returns a new [ParquetReaderBuilder] to read specific SST.
pub fn new(
file_dir: String,
file_handle: FileHandle,
object_store: ObjectStore,
) -> ParquetReaderBuilder {
ParquetReaderBuilder {
file_dir,
file_handle,
object_store,
predicate: None,
time_range: None,
}
}
/// Attaches the predicate to the builder.
pub fn predicate(mut self, predicate: Predicate) -> ParquetReaderBuilder {
self.predicate = Some(predicate);
self
}
/// Attaches the time range to the builder.
pub fn time_range(mut self, time_range: TimestampRange) -> ParquetReaderBuilder {
self.time_range = Some(time_range);
self
}
/// Builds a [ParquetReader].
pub fn build(self) -> ParquetReader {
let file_path = self.file_handle.file_path(&self.file_dir);
ParquetReader {
file_path,
file_handle: self.file_handle,
object_store: self.object_store,
predicate: self.predicate,
time_range: self.time_range,
stream: None,
}
}
}
type BoxedRecordBatchStream = BoxStream<'static, std::result::Result<RecordBatch, ParquetError>>;
/// Parquet batch reader.
pub struct ParquetReader {
/// Path of the file.
file_path: String,
/// SST file to read.
///
/// Holds the file handle to avoid the file purge purge it.
file_handle: FileHandle,
object_store: ObjectStore,
/// Predicate to push down.
predicate: Option<Predicate>,
/// Time range to filter.
time_range: Option<TimestampRange>,
/// Inner parquet record batch stream.
stream: Option<BoxedRecordBatchStream>,
}
impl ParquetReader {
/// Initializes the reader and the parquet stream.
async fn maybe_init(&mut self) -> Result<()> {
if self.stream.is_some() {
// Already initialized.
return Ok(());
}
let reader = self
.object_store
.reader(&self.file_path)
.await
.context(OpenDalSnafu)?
.compat();
let buf_reader = BufReader::new(reader);
let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
// TODO(yingwen): Decode region metadata, create read adapter.
// Prune row groups by metadata.
if let Some(predicate) = &self.predicate {
let pruned_row_groups = predicate
.prune_row_groups(builder.metadata().row_groups())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
builder = builder.with_row_groups(pruned_row_groups);
}
// TODO(yingwen): Projection.
let stream = builder.build().context(ReadParquetSnafu {
path: &self.file_path,
})?;
self.stream = Some(Box::pin(stream));
Ok(())
}
/// Converts our [Batch] from arrow's [RecordBatch].
fn convert_arrow_record_batch(&self, _record_batch: RecordBatch) -> Result<Batch> {
unimplemented!()
}
}
#[async_trait]
impl BatchReader for ParquetReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.maybe_init().await?;
self.stream
.as_mut()
.unwrap()
.try_next()
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?
.map(|rb| self.convert_arrow_record_batch(rb))
.transpose()
}
}

View File

@@ -0,0 +1,104 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Parquet writer.
use common_telemetry::logging;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use crate::error::Result;
use crate::read::Source;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::stream_writer::BufferedWriter;
/// Parquet SST writer.
pub struct ParquetWriter<'a> {
/// SST output file path.
file_path: &'a str,
/// Input data source.
source: Source,
object_store: ObjectStore,
}
impl<'a> ParquetWriter<'a> {
/// Creates a new parquet SST writer.
pub fn new(file_path: &'a str, source: Source, object_store: ObjectStore) -> ParquetWriter {
ParquetWriter {
file_path,
source,
object_store,
}
}
/// Iterates source and writes all rows to Parquet file.
///
/// Returns the [SstInfo] if the SST is written.
pub async fn write_all(&mut self, opts: &WriteOptions) -> Result<Option<SstInfo>> {
let metadata = self.source.metadata();
let json = metadata.to_json()?;
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
// FIXME(yingwen): encode metadata into key value.
let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(opts.row_group_size);
// TODO(yingwen): Set column encoding for internal columns and timestamp.
// e.g. Use DELTA_BINARY_PACKED and disable dictionary for sequence.
let writer_props = props_builder.build();
let mut buffered_writer = BufferedWriter::try_new(
self.file_path.to_string(),
self.object_store.clone(),
&metadata.schema,
Some(writer_props),
opts.write_buffer_size.as_bytes() as usize,
)
.await?;
while let Some(batch) = self.source.next_batch().await? {
buffered_writer.write(&batch).await?;
}
// Get stats from the source.
let stats = self.source.stats();
if stats.num_rows == 0 {
logging::debug!(
"No data written, try to stop the writer: {}",
self.file_path
);
buffered_writer.close().await?;
return Ok(None);
}
let (_file_meta, file_size) = buffered_writer.close().await?;
let time_range = (stats.min_timestamp, stats.max_timestamp);
// object_store.write will make sure all bytes are written or an error is raised.
Ok(Some(SstInfo {
time_range,
file_size,
num_rows: stats.num_rows,
}))
}
}
// TODO(yingwen): Port tests.

View File

@@ -0,0 +1,120 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::pin::Pin;
use common_datasource::buffered_writer::LazyBufferedWriter;
use common_datasource::share_buffer::SharedBuffer;
use datatypes::arrow;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use snafu::ResultExt;
use crate::error;
use crate::error::{NewRecordBatchSnafu, WriteParquetSnafu};
use crate::read::Batch;
/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying
/// storage by chunks to reduce memory consumption.
pub struct BufferedWriter {
inner: InnerBufferedWriter,
arrow_schema: arrow::datatypes::SchemaRef,
}
type InnerBufferedWriter = LazyBufferedWriter<
object_store::Writer,
ArrowWriter<SharedBuffer>,
Box<
dyn FnMut(
String,
) -> Pin<
Box<
dyn Future<Output = common_datasource::error::Result<object_store::Writer>>
+ Send,
>,
> + Send,
>,
>;
impl BufferedWriter {
pub async fn try_new(
path: String,
store: ObjectStore,
schema: &SchemaRef,
props: Option<WriterProperties>,
buffer_threshold: usize,
) -> error::Result<Self> {
let arrow_schema = schema.arrow_schema();
let buffer = SharedBuffer::with_capacity(buffer_threshold);
let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props)
.context(WriteParquetSnafu { path: &path })?;
Ok(Self {
inner: LazyBufferedWriter::new(
buffer_threshold,
buffer,
arrow_writer,
&path,
Box::new(move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer(&path)
.await
.context(common_datasource::error::WriteObjectSnafu { path })
})
}),
),
arrow_schema: arrow_schema.clone(),
})
}
/// Write a record batch to stream writer.
pub async fn write(&mut self, batch: &Batch) -> error::Result<()> {
let arrow_batch = RecordBatch::try_new(
self.arrow_schema.clone(),
batch
.columns
.iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;
self.inner
.write(&arrow_batch)
.await
.context(error::WriteBufferSnafu)?;
self.inner
.try_flush(false)
.await
.context(error::WriteBufferSnafu)?;
Ok(())
}
/// Close parquet writer.
pub async fn close(self) -> error::Result<(FileMetaData, u64)> {
self.inner
.close_with_arrow_writer()
.await
.context(error::WriteBufferSnafu)
}
}

View File

@@ -43,6 +43,15 @@ pub fn join_dir(parent: &str, child: &str) -> String {
opendal::raw::normalize_root(&output)
}
/// Push `child` to `parent` dir and normalize the output path.
///
/// - Path endswith `/` means it's a dir path.
/// - Otherwise, it's a file path.
pub fn join_path(parent: &str, child: &str) -> String {
let output = format!("{parent}/{child}");
opendal::raw::normalize_path(&output)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -55,7 +64,7 @@ mod tests {
}
#[test]
fn test_join_paths() {
fn test_join_dir() {
assert_eq!("/", join_dir("", ""));
assert_eq!("/", join_dir("/", ""));
assert_eq!("/", join_dir("", "/"));
@@ -67,4 +76,18 @@ mod tests {
assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
}
#[test]
fn test_join_path() {
assert_eq!("/", join_path("", ""));
assert_eq!("/", join_path("/", ""));
assert_eq!("/", join_path("", "/"));
assert_eq!("/", join_path("/", "/"));
assert_eq!("a/", join_path("a", ""));
assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
assert_eq!("a/b/c.txt", join_path("/a/b", "c.txt"));
assert_eq!("a/b/c/", join_path("/a/b", "c/"));
assert_eq!("a/b/c/", join_path("/a/b", "/c/"));
assert_eq!("a/b/c.txt", join_path("/a/b", "//c.txt"));
}
}