build(deps): upgrade opendal to 0.46 (#4037)

* build(deps): upgrade opendal to 0.46

Signed-off-by: tison <wander4096@gmail.com>

* migrate writes

Signed-off-by: tison <wander4096@gmail.com>

* migrate reads

Signed-off-by: tison <wander4096@gmail.com>

* fixup object safety

Signed-off-by: tison <wander4096@gmail.com>

* fixup names

Signed-off-by: tison <wander4096@gmail.com>

* fixup compilation

Signed-off-by: tison <wander4096@gmail.com>

* fixup compilation

Signed-off-by: tison <wander4096@gmail.com>

* a few Buffer to Vec

Signed-off-by: tison <wander4096@gmail.com>

* Make greptime buildable with opendal 0.46 (#5)

Signed-off-by: Xuanwo <github@xuanwo.io>

* fixup toml check

Signed-off-by: tison <wander4096@gmail.com>

* test_orc_opener

Signed-off-by: tison <wander4096@gmail.com>

* Fix lru cache (#6)

Signed-off-by: Xuanwo <github@xuanwo.io>

* clippy

Signed-off-by: tison <wander4096@gmail.com>

* improve comments

Signed-off-by: tison <wander4096@gmail.com>

* address comments

Signed-off-by: tison <wander4096@gmail.com>

* reduce buf copy

Signed-off-by: tison <wander4096@gmail.com>

* upgrade to reqwest 0.12

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: Xuanwo <github@xuanwo.io>
Co-authored-by: Xuanwo <github@xuanwo.io>
This commit is contained in:
tison
2024-05-27 17:12:23 +08:00
committed by GitHub
parent 20ce7d428d
commit f9db5ff0d6
31 changed files with 631 additions and 443 deletions

View File

@@ -92,34 +92,44 @@ impl CompressionType {
macro_rules! impl_compression_type {
($(($enum_item:ident, $prefix:ident)),*) => {
paste::item! {
use bytes::{Buf, BufMut, BytesMut};
impl CompressionType {
pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
pub async fn encode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.as_ref().len());
let mut buffer = Vec::with_capacity(content.remaining());
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
encoder.write_all(content.as_ref()).await?;
encoder.write_all_buf(&mut content).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
}
}
pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
pub async fn decode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.as_ref().len() * 2);
let mut buffer = Vec::with_capacity(content.remaining() * 2);
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
encoder.write_all(content.as_ref()).await?;
encoder.write_all_buf(&mut content).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
}
}
@@ -151,13 +161,13 @@ macro_rules! impl_compression_type {
$(
#[tokio::test]
async fn [<test_ $enum_item:lower _compression>]() {
let string = "foo_bar".as_bytes().to_vec();
let string = "foo_bar".as_bytes();
let compress = CompressionType::$enum_item
.encode(&string)
.encode(string)
.await
.unwrap();
let decompress = CompressionType::$enum_item
.decode(&compress)
.decode(compress.as_slice())
.await
.unwrap();
assert_eq!(decompress, string);
@@ -165,13 +175,13 @@ macro_rules! impl_compression_type {
#[tokio::test]
async fn test_uncompression() {
let string = "foo_bar".as_bytes().to_vec();
let string = "foo_bar".as_bytes();
let compress = CompressionType::Uncompressed
.encode(&string)
.encode(string)
.await
.unwrap();
let decompress = CompressionType::Uncompressed
.decode(&compress)
.decode(compress.as_slice())
.await
.unwrap();
assert_eq!(decompress, string);

View File

@@ -36,6 +36,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use self::csv::CsvFormat;
use self::json::JsonFormat;
@@ -146,7 +147,8 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_bytes_stream(..);
let mut upstream = compression_type.convert_stream(reader).fuse();
@@ -203,6 +205,7 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
});

View File

@@ -29,6 +29,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use derive_builder::Builder;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;
use super::stream_to_file;
@@ -164,10 +165,16 @@ impl FileOpener for CsvOpener {
#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
let decoded = self.compression_type.convert_async_read(reader);

View File

@@ -31,6 +31,7 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;
use super::stream_to_file;
@@ -82,10 +83,16 @@ impl Default for JsonFormat {
#[async_trait]
impl FileFormat for JsonFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
let decoded = self.compression_type.convert_async_read(reader);

View File

@@ -16,15 +16,17 @@ use std::sync::Arc;
use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult};
use futures::{StreamExt, TryStreamExt};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
use orc_rust::reader::AsyncChunkReader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
use crate::error::{self, Result};
use crate::file_format::FileFormat;
@@ -32,18 +34,49 @@ use crate::file_format::FileFormat;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
#[derive(Clone)]
pub struct ReaderAdapter {
reader: object_store::Reader,
len: u64,
}
impl ReaderAdapter {
pub fn new(reader: object_store::Reader, len: u64) -> Self {
Self { reader, len }
}
}
impl AsyncChunkReader for ReaderAdapter {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { Ok(self.len) }.boxed()
}
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
async move {
let bytes = self
.reader
.read(offset_from_start..offset_from_start + length)
.await?;
Ok(bytes.to_bytes())
}
.boxed()
}
}
pub async fn new_orc_stream_reader(
reader: ReaderAdapter,
) -> Result<ArrowStreamReader<ReaderAdapter>> {
let reader_build = ArrowReaderBuilder::try_new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
Ok(reader_build.build_async())
}
pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
let reader = new_orc_stream_reader(reader).await?;
Ok(reader.schema().as_ref().clone())
}
@@ -51,13 +84,15 @@ pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>
#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(reader).await?;
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
Ok(schema)
}
}
@@ -97,15 +132,23 @@ impl FileOpener for OrcOpener {
};
let projection = self.projection.clone();
Ok(Box::pin(async move {
let reader = object_store
.reader(meta.location().to_string().as_str())
let path = meta.location().to_string();
let meta = object_store
.stat(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_reader = new_orc_stream_reader(reader)
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_reader =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream =
RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);

View File

@@ -29,10 +29,11 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{ObjectStore, Reader, Writer};
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter};
use crate::error::{self, Result};
@@ -45,10 +46,16 @@ pub struct ParquetFormat {}
#[async_trait]
impl FileFormat for ParquetFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let mut reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
let metadata = reader
.get_metadata()
@@ -98,7 +105,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
pub struct LazyParquetFileReader {
object_store: ObjectStore,
reader: Option<Reader>,
reader: Option<Compat<FuturesAsyncReader>>,
path: String,
}
@@ -114,7 +121,13 @@ impl LazyParquetFileReader {
/// Must initialize the reader, or throw an error from the future.
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
if self.reader.is_none() {
let reader = self.object_store.reader(&self.path).await?;
let meta = self.object_store.stat(&self.path).await?;
let reader = self
.object_store
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.compat();
self.reader = Some(reader);
}
@@ -167,16 +180,17 @@ pub struct BufferedWriter {
}
type InnerBufferedWriter = LazyBufferedWriter<
object_store::Writer,
Compat<object_store::FuturesAsyncWriter>,
ArrowWriter<SharedBuffer>,
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>,
>;
impl BufferedWriter {
fn make_write_factory(
store: ObjectStore,
concurrency: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
) -> impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>
{
move |path| {
let store = store.clone();
Box::pin(async move {
@@ -184,6 +198,7 @@ impl BufferedWriter {
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
})
}

View File

@@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written, origin);
assert_eq_lines(written.to_vec(), origin.to_vec());
}
pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
@@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written, origin);
assert_eq_lines(written.to_vec(), origin.to_vec());
}
// Ignore the CRLF difference across operating systems.

View File

@@ -179,7 +179,7 @@ impl StateStore for ObjectStateStore {
))
})
.context(ListStateSnafu { path: key })?;
yield (key.into(), value);
yield (key.into(), value.to_vec());
}
}
});