mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
4 Commits
v0.2.0-nig
...
v0.2.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7d2ede4a4 | ||
|
|
df6ebbf934 | ||
|
|
719fbf2f3a | ||
|
|
cc14dea913 |
1
.github/ISSUE_TEMPLATE/bug_report.yml
vendored
1
.github/ISSUE_TEMPLATE/bug_report.yml
vendored
@@ -81,5 +81,6 @@ body:
|
||||
Please walk us through and provide steps and details on how
|
||||
to reproduce the issue. If possible, provide scripts that we
|
||||
can run to trigger the bug.
|
||||
render: bash
|
||||
validations:
|
||||
required: true
|
||||
|
||||
74
.github/workflows/release.yml
vendored
74
.github/workflows/release.yml
vendored
@@ -319,87 +319,25 @@ jobs:
|
||||
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-${{ env.SCHEDULED_PERIOD }}-$buildTime
|
||||
echo "SCHEDULED_BUILD_VERSION=${SCHEDULED_BUILD_VERSION}" >> $GITHUB_ENV
|
||||
|
||||
# Only publish release when the release tag is like v1.0.0, v1.0.1, v1.0.2, etc.
|
||||
- name: Set whether it is the latest release
|
||||
run: |
|
||||
if [[ "${{ github.ref_name }}" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||
echo "prerelease=false" >> $GITHUB_ENV
|
||||
echo "makeLatest=true" >> $GITHUB_ENV
|
||||
else
|
||||
echo "prerelease=true" >> $GITHUB_ENV
|
||||
echo "makeLatest=false" >> $GITHUB_ENV
|
||||
fi
|
||||
|
||||
- name: Create scheduled build git tag
|
||||
if: github.event_name == 'schedule'
|
||||
run: |
|
||||
git tag ${{ env.SCHEDULED_BUILD_VERSION }}
|
||||
|
||||
- name: Publish scheduled release # configure the different release title and tags.
|
||||
uses: ncipollo/release-action@v1
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: github.event_name == 'schedule'
|
||||
with:
|
||||
name: "Release ${{ env.SCHEDULED_BUILD_VERSION }}"
|
||||
prerelease: ${{ env.prerelease }}
|
||||
makeLatest: ${{ env.makeLatest }}
|
||||
tag: ${{ env.SCHEDULED_BUILD_VERSION }}
|
||||
generateReleaseNotes: true
|
||||
artifacts: |
|
||||
tag_name: ${{ env.SCHEDULED_BUILD_VERSION }}
|
||||
generate_release_notes: true
|
||||
files: |
|
||||
**/greptime-*
|
||||
|
||||
- name: Publish release
|
||||
uses: ncipollo/release-action@v1
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: github.event_name != 'schedule'
|
||||
with:
|
||||
name: "Release ${{ github.ref_name }}"
|
||||
prerelease: ${{ env.prerelease }}
|
||||
makeLatest: ${{ env.makeLatest }}
|
||||
generateReleaseNotes: true
|
||||
artifacts: |
|
||||
files: |
|
||||
**/greptime-*
|
||||
|
||||
docker-push-uhub:
|
||||
name: Push docker image to UCloud Container Registry
|
||||
needs: [docker]
|
||||
runs-on: ubuntu-latest
|
||||
if: github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'workflow_dispatch'
|
||||
# Push to uhub may fail(500 error), but we don't want to block the release process. The failed job will be retried manually.
|
||||
continue-on-error: true
|
||||
steps:
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v2
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
|
||||
- name: Login to UCloud Container Registry
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: uhub.service.ucloud.cn
|
||||
username: ${{ secrets.UCLOUD_USERNAME }}
|
||||
password: ${{ secrets.UCLOUD_PASSWORD }}
|
||||
|
||||
- name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
|
||||
shell: bash
|
||||
if: github.event_name == 'schedule'
|
||||
run: |
|
||||
buildTime=`date "+%Y%m%d"`
|
||||
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
|
||||
echo "IMAGE_TAG=${SCHEDULED_BUILD_VERSION:1}" >> $GITHUB_ENV
|
||||
|
||||
- name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
|
||||
shell: bash
|
||||
if: github.event_name != 'schedule'
|
||||
run: |
|
||||
VERSION=${{ github.ref_name }}
|
||||
echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV
|
||||
|
||||
- name: Push image to uhub # Use 'docker buildx imagetools create' to create a new image base on source image.
|
||||
run: |
|
||||
docker buildx imagetools create \
|
||||
--tag uhub.service.ucloud.cn/greptime/greptimedb:latest \
|
||||
--tag uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }} \
|
||||
greptime/greptimedb:${{ env.IMAGE_TAG }}
|
||||
|
||||
1321
Cargo.lock
generated
1321
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
16
Cargo.toml
16
Cargo.toml
@@ -47,7 +47,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "0.1.1"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -59,14 +59,12 @@ arrow-schema = { version = "37.0", features = ["serde"] }
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
# TODO(ruihang): use arrow-datafusion when it contains https://github.com/apache/arrow-datafusion/pull/6032
|
||||
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
|
||||
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
|
||||
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
|
||||
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
|
||||
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
|
||||
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
|
||||
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
|
||||
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
|
||||
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
|
||||
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
|
||||
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
parquet = "37.0"
|
||||
|
||||
20
README.md
20
README.md
@@ -131,16 +131,16 @@ about Kubernetes deployment, check our [docs](https://docs.greptime.com/).
|
||||
SELECT * FROM monitor;
|
||||
```
|
||||
|
||||
```TEXT
|
||||
+-------+--------------------------+------+--------+
|
||||
| host | ts | cpu | memory |
|
||||
+-------+--------------------------+------+--------+
|
||||
| host1 | 2022-08-19 16:32:35+0800 | 66.6 | 1024 |
|
||||
| host2 | 2022-08-19 16:32:36+0800 | 77.7 | 2048 |
|
||||
| host3 | 2022-08-19 16:32:37+0800 | 88.8 | 4096 |
|
||||
+-------+--------------------------+------+--------+
|
||||
3 rows in set (0.03 sec)
|
||||
```
|
||||
```TEXT
|
||||
+-------+---------------------+------+--------+
|
||||
| host | ts | cpu | memory |
|
||||
+-------+---------------------+------+--------+
|
||||
| host1 | 2022-08-19 08:32:35 | 66.6 | 1024 |
|
||||
| host2 | 2022-08-19 08:32:36 | 77.7 | 2048 |
|
||||
| host3 | 2022-08-19 08:32:37 | 88.8 | 4096 |
|
||||
+-------+---------------------+------+--------+
|
||||
3 rows in set (0.01 sec)
|
||||
```
|
||||
|
||||
You can always cleanup test database by removing `/tmp/greptimedb`.
|
||||
|
||||
|
||||
@@ -1,527 +0,0 @@
|
||||
# Schema Structs
|
||||
|
||||
# Common Schemas
|
||||
The `datatypes` crate defines the elementary schema struct to describe the metadata.
|
||||
|
||||
## ColumnSchema
|
||||
[ColumnSchema](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/datatypes/src/schema/column_schema.rs#L36) represents the metadata of a column. It is equivalent to arrow's [Field](https://docs.rs/arrow/latest/arrow/datatypes/struct.Field.html) with additional metadata such as default constraint and whether the column is a time index. The time index is the column with a `TIME INDEX` constraint of a table. We can convert the `ColumnSchema` into an arrow `Field` and convert the `Field` back to the `ColumnSchema` without losing metadata.
|
||||
|
||||
```rust
|
||||
pub struct ColumnSchema {
|
||||
pub name: String,
|
||||
pub data_type: ConcreteDataType,
|
||||
is_nullable: bool,
|
||||
is_time_index: bool,
|
||||
default_constraint: Option<ColumnDefaultConstraint>,
|
||||
metadata: Metadata,
|
||||
}
|
||||
```
|
||||
|
||||
## Schema
|
||||
[Schema](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/datatypes/src/schema.rs#L38) is an ordered sequence of `ColumnSchema`. It is equivalent to arrow's [Schema](https://docs.rs/arrow/latest/arrow/datatypes/struct.Schema.html) with additional metadata including the index of the time index column and the version of this schema. Same as `ColumnSchema`, we can convert our `Schema` from/to arrow's `Schema`.
|
||||
|
||||
```rust
|
||||
use arrow::datatypes::Schema as ArrowSchema;
|
||||
|
||||
pub struct Schema {
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
name_to_index: HashMap<String, usize>,
|
||||
arrow_schema: Arc<ArrowSchema>,
|
||||
timestamp_index: Option<usize>,
|
||||
version: u32,
|
||||
}
|
||||
|
||||
pub type SchemaRef = Arc<Schema>;
|
||||
```
|
||||
|
||||
We alias `Arc<Schema>` as `SchemaRef` since it is used frequently. Mostly, we use our `ColumnSchema` and `Schema` structs instead of Arrow's `Field` and `Schema` unless we need to invoke third-party libraries (like DataFusion or ArrowFlight) that rely on Arrow.
|
||||
|
||||
## RawSchema
|
||||
`Schema` contains fields like a map from column names to their indices in the `ColumnSchema` sequences and a cached arrow `Schema`. We can construct these fields from the `ColumnSchema` sequences thus we don't want to serialize them. This is why we don't derive `Serialize` and `Deserialize` for `Schema`. We introduce a new struct [RawSchema](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/datatypes/src/schema/raw.rs#L24) which keeps all required fields of a `Schema` and derives the serialization traits. To serialize a `Schema`, we need to convert it into a `RawSchema` first and serialize the `RawSchema`.
|
||||
|
||||
```rust
|
||||
pub struct RawSchema {
|
||||
pub column_schemas: Vec<ColumnSchema>,
|
||||
pub timestamp_index: Option<usize>,
|
||||
pub version: u32,
|
||||
}
|
||||
```
|
||||
|
||||
We want to keep the `Schema` simple and avoid putting too much business-related metadata in it as many different structs or traits rely on it.
|
||||
|
||||
# Schema of the Table
|
||||
A table maintains its schema in [TableMeta](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/table/src/metadata.rs#L97).
|
||||
```rust
|
||||
pub struct TableMeta {
|
||||
pub schema: SchemaRef,
|
||||
pub primary_key_indices: Vec<usize>,
|
||||
pub value_indices: Vec<usize>,
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
The order of columns in `TableMeta::schema` is the same as the order specified in the `CREATE TABLE` statement which users use to create this table.
|
||||
|
||||
The field `primary_key_indices` stores indices of primary key columns. The field `value_indices` records the indices of value columns (non-primary key and time index, we sometimes call them field columns).
|
||||
|
||||
Suppose we create a table with the following SQL
|
||||
```sql
|
||||
CREATE TABLE cpu (
|
||||
ts TIMESTAMP,
|
||||
host STRING,
|
||||
usage_user DOUBLE,
|
||||
usage_system DOUBLE,
|
||||
datacenter STRING,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(datacenter, host)) ENGINE=mito WITH(regions=1);
|
||||
```
|
||||
|
||||
Then the table's `TableMeta` may look like this:
|
||||
```json
|
||||
{
|
||||
"schema":{
|
||||
"column_schemas":[
|
||||
"ts",
|
||||
"host",
|
||||
"usage_user",
|
||||
"usage_system",
|
||||
"datacenter"
|
||||
],
|
||||
"time_index":0,
|
||||
"version":0
|
||||
},
|
||||
"primary_key_indices":[
|
||||
4,
|
||||
1
|
||||
],
|
||||
"value_indices":[
|
||||
2,
|
||||
3
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
# Schemas of the storage engine
|
||||
We split a table into one or more units with the same schema and then store these units in the storage engine. Each unit is a region in the storage engine.
|
||||
|
||||
The storage engine maintains schemas of regions in more complicated ways because it
|
||||
- adds internal columns that are invisible to users to store additional metadata for each row
|
||||
- provides a data model similar to the key-value model so it organizes columns in a different order
|
||||
- maintains additional metadata like column id or column family
|
||||
|
||||
So the storage engine defines several schema structs:
|
||||
- RegionSchema
|
||||
- StoreSchema
|
||||
- ProjectedSchema
|
||||
|
||||
## RegionSchema
|
||||
A [RegionSchema](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/storage/src/schema/region.rs#L37) describes the schema of a region.
|
||||
|
||||
```rust
|
||||
pub struct RegionSchema {
|
||||
user_schema: SchemaRef,
|
||||
store_schema: StoreSchemaRef,
|
||||
columns: ColumnsMetadataRef,
|
||||
}
|
||||
```
|
||||
|
||||
Each region reserves some columns called `internal columns` for internal usage:
|
||||
- `__sequence`, sequence number of a row
|
||||
- `__op_type`, operation type of a row, such as `PUT` or `DELETE`
|
||||
- `__version`, user-specified version of a row, reserved but not used. We might remove this in the future
|
||||
|
||||
The table engine can't see the `__sequence` and `__op_type` columns, so the `RegionSchema` itself maintains two internal schemas:
|
||||
- User schema, a `Schema` struct that doesn't have internal columns
|
||||
- Store schema, a `StoreSchema` struct that has internal columns
|
||||
|
||||
The `ColumnsMetadata` struct keeps metadata about all columns but most time we only need to use metadata in user schema and store schema, so we just ignore it. We may remove this struct in the future.
|
||||
|
||||
`RegionSchema` organizes columns in the following order:
|
||||
```
|
||||
key columns, timestamp, [__version,] value columns, __sequence, __op_type
|
||||
```
|
||||
|
||||
We can ignore the `__version` column because it is disabled now:
|
||||
|
||||
```
|
||||
key columns, timestamp, value columns, __sequence, __op_type
|
||||
```
|
||||
|
||||
Key columns are columns of a table's primary key. Timestamp is the time index column. A region sorts all rows by key columns, timestamp, sequence, and op type.
|
||||
|
||||
So the `RegionSchema` of our `cpu` table above looks like this:
|
||||
```json
|
||||
{
|
||||
"user_schema":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_user",
|
||||
"usage_system"
|
||||
],
|
||||
"store_schema":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_user",
|
||||
"usage_system",
|
||||
"__sequence",
|
||||
"__op_type"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## StoreSchema
|
||||
As described above, a [StoreSchema](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/storage/src/schema/store.rs#L36) is a schema that knows all internal columns.
|
||||
```rust
|
||||
struct StoreSchema {
|
||||
columns: Vec<ColumnMetadata>,
|
||||
schema: SchemaRef,
|
||||
row_key_end: usize,
|
||||
user_column_end: usize,
|
||||
}
|
||||
```
|
||||
|
||||
The columns in the `columns` and `schema` fields have the same order. The `ColumnMetadata` has metadata like column id, column family id, and comment. The `StoreSchema` also stores this metadata in `StoreSchema::schema`, so we can convert the `StoreSchema` between arrow's `Schema`. We use this feature to persist the `StoreSchema` in the SST since our SST format is `Parquet`, which can take arrow's `Schema` as its schema.
|
||||
|
||||
The `StoreSchema` of the region above is similar to this:
|
||||
```json
|
||||
{
|
||||
"schema":{
|
||||
"column_schemas":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_user",
|
||||
"usage_system",
|
||||
"__sequence",
|
||||
"__op_type"
|
||||
],
|
||||
"time_index":2,
|
||||
"version":0
|
||||
},
|
||||
"row_key_end":3,
|
||||
"user_column_end":5
|
||||
}
|
||||
```
|
||||
|
||||
The key and timestamp columns form row keys of rows. We put them together so we can use `row_key_end` to get indices of all row key columns. Similarly, we can use the `user_column_end` to get indices of all user columns (non-internal columns).
|
||||
```rust
|
||||
impl StoreSchema {
|
||||
#[inline]
|
||||
pub(crate) fn row_key_indices(&self) -> impl Iterator<Item = usize> {
|
||||
0..self.row_key_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn value_indices(&self) -> impl Iterator<Item = usize> {
|
||||
self.row_key_end..self.user_column_end
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Another useful feature of `StoreSchema` is that we ensure it always contains key columns, a timestamp column, and internal columns because we need them to perform merge, deduplication, and delete. Projection on `StoreSchema` only projects value columns.
|
||||
|
||||
## ProjectedSchema
|
||||
To support arbitrary projection, we introduce the [ProjectedSchema](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/storage/src/schema/projected.rs#L106).
|
||||
```rust
|
||||
pub struct ProjectedSchema {
|
||||
projection: Option<Projection>,
|
||||
schema_to_read: StoreSchemaRef,
|
||||
projected_user_schema: SchemaRef,
|
||||
}
|
||||
```
|
||||
|
||||
We need to handle many cases while doing projection:
|
||||
- The columns' order of table and region is different
|
||||
- The projection can be in arbitrary order, e.g. `select usage_user, host from cpu` and `select host, usage_user from cpu` have different projection order
|
||||
- We support `ALTER TABLE` so data files may have different schemas.
|
||||
|
||||
### Projection
|
||||
Let's take an example to see how projection works. Suppose we want to select `ts`, `usage_system` from the `cpu` table.
|
||||
|
||||
```sql
|
||||
CREATE TABLE cpu (
|
||||
ts TIMESTAMP,
|
||||
host STRING,
|
||||
usage_user DOUBLE,
|
||||
usage_system DOUBLE,
|
||||
datacenter STRING,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(datacenter, host)) ENGINE=mito WITH(regions=1);
|
||||
|
||||
select ts, usage_system from cpu;
|
||||
```
|
||||
|
||||
The query engine uses the projection `[0, 3]` to scan the table. However, columns in the region have a different order, so the table engine adjusts the projection to `2, 4`.
|
||||
```json
|
||||
{
|
||||
"user_schema":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_user",
|
||||
"usage_system"
|
||||
],
|
||||
}
|
||||
```
|
||||
|
||||
As you can see, the output order is still `[ts, usage_system]`. This is the schema users can see after projection so we call it `projected user schema`.
|
||||
|
||||
But the storage engine also needs to read key columns, a timestamp column, and internal columns. So we maintain a `StoreSchema` after projection in the `ProjectedSchema`.
|
||||
|
||||
The `Projection` struct is a helper struct to help compute the projected user schema and store schema.
|
||||
|
||||
So we can construct the following `ProjectedSchema`:
|
||||
```json
|
||||
{
|
||||
"schema_to_read":{
|
||||
"schema":{
|
||||
"column_schemas":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_system",
|
||||
"__sequence",
|
||||
"__op_type"
|
||||
],
|
||||
"time_index":2,
|
||||
"version":0
|
||||
},
|
||||
"row_key_end":3,
|
||||
"user_column_end":4
|
||||
},
|
||||
"projected_user_schema":{
|
||||
"column_schemas":[
|
||||
"ts",
|
||||
"usage_system"
|
||||
],
|
||||
"time_index":0
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
As you can see, `schema_to_read` doesn't contain the column `usage_user` that is not intended to be read (not in projection).
|
||||
|
||||
### ReadAdapter
|
||||
As mentioned above, we can alter a table so the underlying files (SSTs) and memtables in the storage engine may have different schemas.
|
||||
|
||||
To simplify the logic of `ProjectedSchema`, we handle the difference between schemas before projection (constructing the `ProjectedSchema`). We introduce [ReadAdapter](https://github.com/GreptimeTeam/greptimedb/blob/9fa871a3fad07f583dc1863a509414da393747f8/src/storage/src/schema/compat.rs#L90) that adapts rows with different source schemas to the same expected schema.
|
||||
|
||||
So we can always use the current `RegionSchema` of the region to construct the `ProjectedSchema`, and then create a `ReadAdapter` for each memtable or SST.
|
||||
```rust
|
||||
#[derive(Debug)]
|
||||
pub struct ReadAdapter {
|
||||
source_schema: StoreSchemaRef,
|
||||
dest_schema: ProjectedSchemaRef,
|
||||
indices_in_result: Vec<Option<usize>>,
|
||||
is_source_needed: Vec<bool>,
|
||||
}
|
||||
```
|
||||
|
||||
For each column required by `dest_schema`, `indices_in_result` stores the index of that column in the row read from the source memtable or SST. If the source row doesn't contain that column, the index is `None`.
|
||||
|
||||
The field `is_source_needed` stores whether a column in the source memtable or SST is needed.
|
||||
|
||||
Suppose we add a new column `usage_idle` to the table `cpu`.
|
||||
```sql
|
||||
ALTER TABLE cpu ADD COLUMN usage_idle DOUBLE;
|
||||
```
|
||||
|
||||
The new `StoreSchema` becomes:
|
||||
```json
|
||||
{
|
||||
"schema":{
|
||||
"column_schemas":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_user",
|
||||
"usage_system",
|
||||
"usage_idle",
|
||||
"__sequence",
|
||||
"__op_type"
|
||||
],
|
||||
"time_index":2,
|
||||
"version":1
|
||||
},
|
||||
"row_key_end":3,
|
||||
"user_column_end":6
|
||||
}
|
||||
```
|
||||
|
||||
Note that we bump the version of the schema to 1.
|
||||
|
||||
If we want to select `ts`, `usage_system`, and `usage_idle`. While reading from the old schema, the storage engine creates a `ReadAdapter` like this:
|
||||
```json
|
||||
{
|
||||
"source_schema":{
|
||||
"schema":{
|
||||
"column_schemas":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_user",
|
||||
"usage_system",
|
||||
"__sequence",
|
||||
"__op_type"
|
||||
],
|
||||
"time_index":2,
|
||||
"version":0
|
||||
},
|
||||
"row_key_end":3,
|
||||
"user_column_end":5
|
||||
},
|
||||
"dest_schema":{
|
||||
"schema_to_read":{
|
||||
"schema":{
|
||||
"column_schemas":[
|
||||
"datacenter",
|
||||
"host",
|
||||
"ts",
|
||||
"usage_system",
|
||||
"usage_idle",
|
||||
"__sequence",
|
||||
"__op_type"
|
||||
],
|
||||
"time_index":2,
|
||||
"version":1
|
||||
},
|
||||
"row_key_end":3,
|
||||
"user_column_end":5
|
||||
},
|
||||
"projected_user_schema":{
|
||||
"column_schemas":[
|
||||
"ts",
|
||||
"usage_system",
|
||||
"usage_idle"
|
||||
],
|
||||
"time_index":0
|
||||
}
|
||||
},
|
||||
"indices_in_result":[
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
null,
|
||||
4,
|
||||
5
|
||||
],
|
||||
"is_source_needed":[
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
true
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
We don't need to read `usage_user` so `is_source_needed[3]` is false. The old schema doesn't have column `usage_idle` so `indices_in_result[4]` is `null` and the `ReadAdapter` needs to insert a null column to the output row so the output schema still contains `usage_idle`.
|
||||
|
||||
The figure below shows the relationship between `RegionSchema`, `StoreSchema`, `ProjectedSchema`, and `ReadAdapter`.
|
||||
|
||||
```text
|
||||
┌──────────────────────────────┐
|
||||
│ │
|
||||
│ ┌────────────────────┐ │
|
||||
│ │ store_schema │ │
|
||||
│ │ │ │
|
||||
│ │ StoreSchema │ │
|
||||
│ │ version 1 │ │
|
||||
│ └────────────────────┘ │
|
||||
│ │
|
||||
│ ┌────────────────────┐ │
|
||||
│ │ user_schema │ │
|
||||
│ └────────────────────┘ │
|
||||
│ │
|
||||
│ RegionSchema │
|
||||
│ │
|
||||
└──────────────┬───────────────┘
|
||||
│
|
||||
│
|
||||
│
|
||||
┌──────────────▼───────────────┐
|
||||
│ │
|
||||
│ ┌──────────────────────────┐ │
|
||||
│ │ schema_to_read │ │
|
||||
│ │ │ │
|
||||
│ │ StoreSchema (projected) │ │
|
||||
│ │ version 1 │ │
|
||||
│ └──────────────────────────┘ │
|
||||
┌───┤ ├───┐
|
||||
│ │ ┌──────────────────────────┐ │ │
|
||||
│ │ │ projected_user_schema │ │ │
|
||||
│ │ └──────────────────────────┘ │ │
|
||||
│ │ │ │
|
||||
│ │ ProjectedSchema │ │
|
||||
dest schema │ └──────────────────────────────┘ │ dest schema
|
||||
│ │
|
||||
│ │
|
||||
┌──────▼───────┐ ┌───────▼──────┐
|
||||
│ │ │ │
|
||||
│ ReadAdapter │ │ ReadAdapter │
|
||||
│ │ │ │
|
||||
└──────▲───────┘ └───────▲──────┘
|
||||
│ │
|
||||
│ │
|
||||
source schema │ │ source schema
|
||||
│ │
|
||||
┌───────┴─────────┐ ┌────────┴────────┐
|
||||
│ │ │ │
|
||||
│ ┌─────────────┐ │ │ ┌─────────────┐ │
|
||||
│ │ │ │ │ │ │ │
|
||||
│ │ StoreSchema │ │ │ │ StoreSchema │ │
|
||||
│ │ │ │ │ │ │ │
|
||||
│ │ version 0 │ │ │ │ version 1 │ │
|
||||
│ │ │ │ │ │ │ │
|
||||
│ └─────────────┘ │ │ └─────────────┘ │
|
||||
│ │ │ │
|
||||
│ SST 0 │ │ SST 1 │
|
||||
│ │ │ │
|
||||
└─────────────────┘ └─────────────────┘
|
||||
```
|
||||
|
||||
# Conversion
|
||||
This figure shows the conversion between schemas:
|
||||
```text
|
||||
┌─────────────┐ schema From ┌─────────────┐
|
||||
│ ├──────────────────┐ ┌────────────────────────────► │
|
||||
│ TableMeta │ │ │ │ RawSchema │
|
||||
│ │ │ │ ┌─────────────────────────┤ │
|
||||
└─────────────┘ │ │ │ TryFrom └─────────────┘
|
||||
│ │ │
|
||||
│ │ │
|
||||
│ │ │
|
||||
│ │ │
|
||||
│ │ │
|
||||
┌───────────────────┐ ┌─────▼──┴──▼──┐ arrow_schema() ┌─────────────────┐
|
||||
│ │ │ ├─────────────────────► │
|
||||
│ ColumnsMetadata │ ┌─────► Schema │ │ ArrowSchema ├──┐
|
||||
│ │ │ │ ◄─────────────────────┤ │ │
|
||||
└────┬───────────▲──┘ │ └───▲───▲──────┘ TryFrom └─────────────────┘ │
|
||||
│ │ │ │ │ │
|
||||
│ │ │ │ └────────────────────────────────────────┐ │
|
||||
│ │ │ │ │ │
|
||||
│ columns │ user_schema() │ │ │
|
||||
│ │ │ │ projected_user_schema() schema() │
|
||||
│ │ │ │ │ │
|
||||
│ ┌───┴─────────────┴─┐ │ ┌────────────────────┐ │ │
|
||||
columns │ │ │ └─────────────────┤ │ │ │ TryFrom
|
||||
│ │ RegionSchema │ │ ProjectedSchema │ │ │
|
||||
│ │ ├─────────────────────────► │ │ │
|
||||
│ └─────────────────┬─┘ ProjectedSchema::new() └──────────────────┬─┘ │ │
|
||||
│ │ │ │ │
|
||||
│ │ │ │ │
|
||||
│ │ │ │ │
|
||||
│ │ │ │ │
|
||||
┌────▼────────────────────┐ │ store_schema() ┌────▼───────┴──┐ │
|
||||
│ │ └─────────────────────────────────────────► │ │
|
||||
│ Vec<ColumnMetadata> │ │ StoreSchema ◄─────┘
|
||||
│ ◄──────────────────────────────────────────────┤ │
|
||||
└─────────────────────────┘ columns └───────────────┘
|
||||
```
|
||||
@@ -51,17 +51,13 @@ get_os_type
|
||||
get_arch_type
|
||||
|
||||
if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then
|
||||
# Use the latest nightly version.
|
||||
if [ "${VERSION}" = "latest" ]; then
|
||||
VERSION=$(curl -s -XGET "https://api.github.com/repos/${GITHUB_ORG}/${GITHUB_REPO}/releases" | grep tag_name | grep nightly | cut -d: -f 2 | sed 's/.*"\(.*\)".*/\1/' | uniq | sort -r | head -n 1)
|
||||
if [ -z "${VERSION}" ]; then
|
||||
echo "Failed to get the latest version."
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
echo "Downloading ${BIN}, OS: ${OS_TYPE}, Arch: ${ARCH_TYPE}, Version: ${VERSION}"
|
||||
|
||||
wget "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${BIN}-${OS_TYPE}-${ARCH_TYPE}.tgz"
|
||||
if [ "${VERSION}" = "latest" ]; then
|
||||
wget "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/latest/download/${BIN}-${OS_TYPE}-${ARCH_TYPE}.tgz"
|
||||
else
|
||||
wget "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${BIN}-${OS_TYPE}-${ARCH_TYPE}.tgz"
|
||||
fi
|
||||
|
||||
tar xvf ${BIN}-${OS_TYPE}-${ARCH_TYPE}.tgz && rm ${BIN}-${OS_TYPE}-${ARCH_TYPE}.tgz && echo "Run './${BIN} --help' to get started"
|
||||
fi
|
||||
|
||||
@@ -109,16 +109,10 @@ impl CatalogProvider for CatalogProviderAdapter {
|
||||
}
|
||||
|
||||
///Greptime CatalogProvider -> datafusion's CatalogProvider
|
||||
pub struct DfCatalogProviderAdapter {
|
||||
struct DfCatalogProviderAdapter {
|
||||
catalog_provider: CatalogProviderRef,
|
||||
}
|
||||
|
||||
impl DfCatalogProviderAdapter {
|
||||
pub fn new(catalog_provider: CatalogProviderRef) -> Self {
|
||||
Self { catalog_provider }
|
||||
}
|
||||
}
|
||||
|
||||
impl DfCatalogProvider for DfCatalogProviderAdapter {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
|
||||
@@ -16,11 +16,9 @@ async-compression = { version = "0.3", features = [
|
||||
"tokio",
|
||||
] }
|
||||
async-trait.workspace = true
|
||||
bytes = "1.1"
|
||||
common-error = { path = "../error" }
|
||||
common-runtime = { path = "../runtime" }
|
||||
datafusion.workspace = true
|
||||
derive_builder = "0.12"
|
||||
futures.workspace = true
|
||||
object-store = { path = "../../object-store" }
|
||||
regex = "1.7"
|
||||
|
||||
@@ -13,14 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::io;
|
||||
use std::str::FromStr;
|
||||
|
||||
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder};
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use tokio::io::{AsyncRead, BufReader};
|
||||
use tokio_util::io::{ReaderStream, StreamReader};
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
@@ -85,25 +81,4 @@ impl CompressionType {
|
||||
CompressionType::UNCOMPRESSED => Box::new(s),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn convert_stream<T: Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static>(
|
||||
&self,
|
||||
s: T,
|
||||
) -> Box<dyn Stream<Item = io::Result<Bytes>> + Send + Unpin> {
|
||||
match self {
|
||||
CompressionType::GZIP => {
|
||||
Box::new(ReaderStream::new(GzipDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::BZIP2 => {
|
||||
Box::new(ReaderStream::new(BzDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::XZ => {
|
||||
Box::new(ReaderStream::new(XzDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::ZSTD => {
|
||||
Box::new(ReaderStream::new(ZstdDecoder::new(StreamReader::new(s))))
|
||||
}
|
||||
CompressionType::UNCOMPRESSED => Box::new(s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,108 +15,16 @@
|
||||
pub mod csv;
|
||||
pub mod json;
|
||||
pub mod parquet;
|
||||
#[cfg(test)]
|
||||
pub mod tests;
|
||||
|
||||
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
|
||||
|
||||
use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_schema::ArrowError;
|
||||
use async_trait::async_trait;
|
||||
use bytes::{Buf, Bytes};
|
||||
use datafusion::error::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion::physical_plan::file_format::FileOpenFuture;
|
||||
use futures::StreamExt;
|
||||
use object_store::ObjectStore;
|
||||
|
||||
use crate::compression::CompressionType;
|
||||
use crate::error::Result;
|
||||
|
||||
#[async_trait]
|
||||
pub trait FileFormat: Send + Sync + std::fmt::Debug {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<SchemaRef>;
|
||||
}
|
||||
|
||||
pub trait ArrowDecoder: Send + 'static {
|
||||
/// Decode records from `buf` returning the number of bytes read.
|
||||
///
|
||||
/// This method returns `Ok(0)` once `batch_size` objects have been parsed since the
|
||||
/// last call to [`Self::flush`], or `buf` is exhausted.
|
||||
///
|
||||
/// Any remaining bytes should be included in the next call to [`Self::decode`].
|
||||
fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError>;
|
||||
|
||||
/// Flushes the currently buffered data to a [`RecordBatch`].
|
||||
///
|
||||
/// This should only be called after [`Self::decode`] has returned `Ok(0)`,
|
||||
/// otherwise may return an error if part way through decoding a record
|
||||
///
|
||||
/// Returns `Ok(None)` if no buffered data.
|
||||
fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError>;
|
||||
}
|
||||
|
||||
impl ArrowDecoder for arrow::csv::reader::Decoder {
|
||||
fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
|
||||
self.decode(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl ArrowDecoder for arrow::json::RawDecoder {
|
||||
fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
|
||||
self.decode(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> result::Result<Option<RecordBatch>, ArrowError> {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
|
||||
object_store: Arc<ObjectStore>,
|
||||
path: String,
|
||||
compression_type: CompressionType,
|
||||
decoder_factory: F,
|
||||
) -> DataFusionResult<FileOpenFuture> {
|
||||
let mut decoder = decoder_factory()?;
|
||||
Ok(Box::pin(async move {
|
||||
let reader = object_store
|
||||
.reader(&path)
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
let mut upstream = compression_type.convert_stream(reader).fuse();
|
||||
|
||||
let mut buffered = Bytes::new();
|
||||
|
||||
let stream = futures::stream::poll_fn(move |cx| {
|
||||
loop {
|
||||
if buffered.is_empty() {
|
||||
if let Some(result) = futures::ready!(upstream.poll_next_unpin(cx)) {
|
||||
buffered = result?;
|
||||
};
|
||||
}
|
||||
|
||||
let decoded = decoder.decode(buffered.as_ref())?;
|
||||
|
||||
if decoded == 0 {
|
||||
break;
|
||||
} else {
|
||||
buffered.advance(decoded);
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(decoder.flush().transpose())
|
||||
});
|
||||
|
||||
Ok(stream.boxed())
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -14,21 +14,17 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::csv;
|
||||
use arrow::csv::reader::infer_reader_schema as infer_csv_schema;
|
||||
use arrow_schema::SchemaRef;
|
||||
use async_trait::async_trait;
|
||||
use common_runtime;
|
||||
use datafusion::error::Result as DataFusionResult;
|
||||
use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener};
|
||||
use derive_builder::Builder;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use tokio_util::io::SyncIoBridge;
|
||||
|
||||
use crate::compression::CompressionType;
|
||||
use crate::error::{self, Result};
|
||||
use crate::file_format::{self, open_with_decoder, FileFormat};
|
||||
use crate::file_format::{self, FileFormat};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CsvFormat {
|
||||
@@ -49,67 +45,6 @@ impl Default for CsvFormat {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Builder)]
|
||||
pub struct CsvConfig {
|
||||
batch_size: usize,
|
||||
file_schema: SchemaRef,
|
||||
#[builder(default = "None")]
|
||||
file_projection: Option<Vec<usize>>,
|
||||
#[builder(default = "true")]
|
||||
has_header: bool,
|
||||
#[builder(default = "b','")]
|
||||
delimiter: u8,
|
||||
}
|
||||
|
||||
impl CsvConfig {
|
||||
fn builder(&self) -> csv::ReaderBuilder {
|
||||
let mut builder = csv::ReaderBuilder::new()
|
||||
.with_schema(self.file_schema.clone())
|
||||
.with_delimiter(self.delimiter)
|
||||
.with_batch_size(self.batch_size)
|
||||
.has_header(self.has_header);
|
||||
|
||||
if let Some(proj) = &self.file_projection {
|
||||
builder = builder.with_projection(proj.clone());
|
||||
}
|
||||
|
||||
builder
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CsvOpener {
|
||||
config: Arc<CsvConfig>,
|
||||
object_store: Arc<ObjectStore>,
|
||||
compression_type: CompressionType,
|
||||
}
|
||||
|
||||
impl CsvOpener {
|
||||
/// Return a new [`CsvOpener`]. The caller must ensure [`CsvConfig`].file_schema must correspond to the opening file.
|
||||
pub fn new(
|
||||
config: CsvConfig,
|
||||
object_store: ObjectStore,
|
||||
compression_type: CompressionType,
|
||||
) -> Self {
|
||||
CsvOpener {
|
||||
config: Arc::new(config),
|
||||
object_store: Arc::new(object_store),
|
||||
compression_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileOpener for CsvOpener {
|
||||
fn open(&self, meta: FileMeta) -> DataFusionResult<FileOpenFuture> {
|
||||
open_with_decoder(
|
||||
self.object_store.clone(),
|
||||
meta.location().to_string(),
|
||||
self.compression_type,
|
||||
|| Ok(self.config.builder().build_decoder()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl FileFormat for CsvFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<SchemaRef> {
|
||||
|
||||
@@ -17,18 +17,15 @@ use std::sync::Arc;
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
|
||||
use arrow::json::RawReaderBuilder;
|
||||
use async_trait::async_trait;
|
||||
use common_runtime;
|
||||
use datafusion::error::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use tokio_util::io::SyncIoBridge;
|
||||
|
||||
use crate::compression::CompressionType;
|
||||
use crate::error::{self, Result};
|
||||
use crate::file_format::{self, open_with_decoder, FileFormat};
|
||||
use crate::file_format::{self, FileFormat};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct JsonFormat {
|
||||
@@ -72,47 +69,6 @@ impl FileFormat for JsonFormat {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JsonOpener {
|
||||
batch_size: usize,
|
||||
projected_schema: SchemaRef,
|
||||
object_store: Arc<ObjectStore>,
|
||||
compression_type: CompressionType,
|
||||
}
|
||||
|
||||
impl JsonOpener {
|
||||
/// Return a new [`JsonOpener`]. Any fields not present in `projected_schema` will be ignored.
|
||||
pub fn new(
|
||||
batch_size: usize,
|
||||
projected_schema: SchemaRef,
|
||||
object_store: ObjectStore,
|
||||
compression_type: CompressionType,
|
||||
) -> Self {
|
||||
Self {
|
||||
batch_size,
|
||||
projected_schema,
|
||||
object_store: Arc::new(object_store),
|
||||
compression_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileOpener for JsonOpener {
|
||||
fn open(&self, meta: FileMeta) -> DataFusionResult<FileOpenFuture> {
|
||||
open_with_decoder(
|
||||
self.object_store.clone(),
|
||||
meta.location().to_string(),
|
||||
self.compression_type,
|
||||
|| {
|
||||
RawReaderBuilder::new(self.projected_schema.clone())
|
||||
.with_batch_size(self.batch_size)
|
||||
.build_decoder()
|
||||
.map_err(DataFusionError::from)
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,161 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::vec;
|
||||
|
||||
use arrow_schema::SchemaRef;
|
||||
use datafusion::assert_batches_eq;
|
||||
use datafusion::datasource::listing::PartitionedFile;
|
||||
use datafusion::datasource::object_store::ObjectStoreUrl;
|
||||
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream};
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use futures::StreamExt;
|
||||
|
||||
use crate::compression::CompressionType;
|
||||
use crate::file_format::csv::{CsvConfigBuilder, CsvOpener};
|
||||
use crate::file_format::json::JsonOpener;
|
||||
use crate::test_util::{self, test_basic_schema, test_store};
|
||||
|
||||
fn scan_config(file_schema: SchemaRef, limit: Option<usize>, filename: &str) -> FileScanConfig {
|
||||
FileScanConfig {
|
||||
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
|
||||
file_schema,
|
||||
file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
|
||||
statistics: Default::default(),
|
||||
projection: None,
|
||||
limit,
|
||||
table_partition_cols: vec![],
|
||||
output_ordering: None,
|
||||
infinite_source: false,
|
||||
}
|
||||
}
|
||||
|
||||
struct Test<'a, T: FileOpener> {
|
||||
config: FileScanConfig,
|
||||
opener: T,
|
||||
expected: Vec<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a, T: FileOpener> Test<'a, T> {
|
||||
pub async fn run(self) {
|
||||
let result = FileStream::new(
|
||||
&self.config,
|
||||
0,
|
||||
self.opener,
|
||||
&ExecutionPlanMetricsSet::new(),
|
||||
)
|
||||
.unwrap()
|
||||
.map(|b| b.unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
assert_batches_eq!(self.expected, &result);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_json_opener() {
|
||||
let store = test_store("/");
|
||||
|
||||
let schema = test_basic_schema();
|
||||
|
||||
let json_opener = JsonOpener::new(
|
||||
100,
|
||||
schema.clone(),
|
||||
store.clone(),
|
||||
CompressionType::UNCOMPRESSED,
|
||||
);
|
||||
|
||||
let path = &test_util::get_data_dir("tests/json/basic.json")
|
||||
.display()
|
||||
.to_string();
|
||||
let tests = [
|
||||
Test {
|
||||
config: scan_config(schema.clone(), None, path),
|
||||
opener: json_opener.clone(),
|
||||
expected: vec![
|
||||
"+-----+-------+",
|
||||
"| num | str |",
|
||||
"+-----+-------+",
|
||||
"| 5 | test |",
|
||||
"| 2 | hello |",
|
||||
"| 4 | foo |",
|
||||
"+-----+-------+",
|
||||
],
|
||||
},
|
||||
Test {
|
||||
config: scan_config(schema.clone(), Some(1), path),
|
||||
opener: json_opener.clone(),
|
||||
expected: vec![
|
||||
"+-----+------+",
|
||||
"| num | str |",
|
||||
"+-----+------+",
|
||||
"| 5 | test |",
|
||||
"+-----+------+",
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
for test in tests {
|
||||
test.run().await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_csv_opener() {
|
||||
let store = test_store("/");
|
||||
|
||||
let schema = test_basic_schema();
|
||||
let path = &test_util::get_data_dir("tests/csv/basic.csv")
|
||||
.display()
|
||||
.to_string();
|
||||
let csv_conf = CsvConfigBuilder::default()
|
||||
.batch_size(test_util::TEST_BATCH_SIZE)
|
||||
.file_schema(schema.clone())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let csv_opener = CsvOpener::new(csv_conf, store, CompressionType::UNCOMPRESSED);
|
||||
|
||||
let tests = [
|
||||
Test {
|
||||
config: scan_config(schema.clone(), None, path),
|
||||
opener: csv_opener.clone(),
|
||||
expected: vec![
|
||||
"+-----+-------+",
|
||||
"| num | str |",
|
||||
"+-----+-------+",
|
||||
"| 5 | test |",
|
||||
"| 2 | hello |",
|
||||
"| 4 | foo |",
|
||||
"+-----+-------+",
|
||||
],
|
||||
},
|
||||
Test {
|
||||
config: scan_config(schema.clone(), Some(1), path),
|
||||
opener: csv_opener.clone(),
|
||||
expected: vec![
|
||||
"+-----+------+",
|
||||
"| num | str |",
|
||||
"+-----+------+",
|
||||
"| 5 | test |",
|
||||
"+-----+------+",
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
for test in tests {
|
||||
test.run().await;
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,5 @@ pub mod error;
|
||||
pub mod file_format;
|
||||
pub mod lister;
|
||||
pub mod object_store;
|
||||
#[cfg(test)]
|
||||
pub mod test_util;
|
||||
pub mod util;
|
||||
|
||||
@@ -13,14 +13,11 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_schema::{DataType, Field, Schema, SchemaRef};
|
||||
use arrow_schema::SchemaRef;
|
||||
use object_store::services::Fs;
|
||||
use object_store::ObjectStore;
|
||||
|
||||
pub const TEST_BATCH_SIZE: usize = 100;
|
||||
|
||||
pub fn get_data_dir(path: &str) -> PathBuf {
|
||||
// https://doc.rust-lang.org/cargo/reference/environment-variables.html
|
||||
let dir = env!("CARGO_MANIFEST_DIR");
|
||||
@@ -49,11 +46,3 @@ pub fn test_store(root: &str) -> ObjectStore {
|
||||
|
||||
ObjectStore::new(builder).unwrap().finish()
|
||||
}
|
||||
|
||||
pub fn test_basic_schema() -> SchemaRef {
|
||||
let schema = Schema::new(vec![
|
||||
Field::new("num", DataType::Int64, false),
|
||||
Field::new("str", DataType::Utf8, false),
|
||||
]);
|
||||
Arc::new(schema)
|
||||
}
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
{"num":5,"str":"test"}
|
||||
{"num":2,"str":"hello"}
|
||||
{"num":4,"str":"foo"}
|
||||
@@ -14,18 +14,16 @@ common-error = { path = "../error" }
|
||||
common-telemetry = { path = "../telemetry" }
|
||||
datafusion.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
datafusion-substrait.workspace = true
|
||||
datatypes = { path = "../../datatypes" }
|
||||
futures = "0.3"
|
||||
prost.workspace = true
|
||||
session = { path = "../../session" }
|
||||
snafu.workspace = true
|
||||
table = { path = "../../table" }
|
||||
query = { path = "../../query" }
|
||||
|
||||
[dependencies.substrait_proto]
|
||||
package = "substrait"
|
||||
version = "0.7"
|
||||
version = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
datatypes = { path = "../../datatypes" }
|
||||
|
||||
@@ -49,10 +49,10 @@ use crate::error::{
|
||||
use crate::schema::{from_schema, to_schema};
|
||||
use crate::SubstraitPlan;
|
||||
|
||||
pub struct DFLogicalSubstraitConvertorDeprecated;
|
||||
pub struct DFLogicalSubstraitConvertor;
|
||||
|
||||
#[async_trait]
|
||||
impl SubstraitPlan for DFLogicalSubstraitConvertorDeprecated {
|
||||
impl SubstraitPlan for DFLogicalSubstraitConvertor {
|
||||
type Error = Error;
|
||||
|
||||
type Plan = LogicalPlan;
|
||||
@@ -76,7 +76,7 @@ impl SubstraitPlan for DFLogicalSubstraitConvertorDeprecated {
|
||||
}
|
||||
}
|
||||
|
||||
impl DFLogicalSubstraitConvertorDeprecated {
|
||||
impl DFLogicalSubstraitConvertor {
|
||||
async fn convert_plan(
|
||||
&self,
|
||||
mut plan: Plan,
|
||||
@@ -197,14 +197,6 @@ impl DFLogicalSubstraitConvertorDeprecated {
|
||||
name: "Cross Relation",
|
||||
}
|
||||
.fail()?,
|
||||
RelType::HashJoin(_) => UnsupportedPlanSnafu {
|
||||
name: "Cross Relation",
|
||||
}
|
||||
.fail()?,
|
||||
RelType::MergeJoin(_) => UnsupportedPlanSnafu {
|
||||
name: "Cross Relation",
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
|
||||
Ok(logical_plan)
|
||||
@@ -319,7 +311,7 @@ impl DFLogicalSubstraitConvertorDeprecated {
|
||||
}
|
||||
}
|
||||
|
||||
impl DFLogicalSubstraitConvertorDeprecated {
|
||||
impl DFLogicalSubstraitConvertor {
|
||||
fn logical_plan_to_rel(
|
||||
&self,
|
||||
ctx: &mut ConvertorContext,
|
||||
@@ -593,7 +585,7 @@ mod test {
|
||||
}
|
||||
|
||||
async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) {
|
||||
let convertor = DFLogicalSubstraitConvertorDeprecated;
|
||||
let convertor = DFLogicalSubstraitConvertor;
|
||||
|
||||
let proto = convertor.encode(plan.clone()).unwrap();
|
||||
let tripped_plan = convertor.decode(proto, catalog).await.unwrap();
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use catalog::datafusion::catalog_adapter::DfCatalogProviderAdapter;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
|
||||
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
use substrait_proto::proto::Plan;
|
||||
|
||||
use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error};
|
||||
use crate::SubstraitPlan;
|
||||
|
||||
pub struct DFLogicalSubstraitConvertor;
|
||||
|
||||
#[async_trait]
|
||||
impl SubstraitPlan for DFLogicalSubstraitConvertor {
|
||||
type Error = Error;
|
||||
|
||||
type Plan = LogicalPlan;
|
||||
|
||||
async fn decode<B: Buf + Send>(
|
||||
&self,
|
||||
message: B,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
) -> Result<Self::Plan, Self::Error> {
|
||||
let mut context = SessionContext::new();
|
||||
context.register_catalog(
|
||||
DEFAULT_CATALOG_NAME,
|
||||
Arc::new(DfCatalogProviderAdapter::new(
|
||||
catalog_manager
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
)),
|
||||
);
|
||||
|
||||
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
|
||||
let df_plan = from_substrait_plan(&mut context, &plan)
|
||||
.await
|
||||
.context(DecodeDfPlanSnafu)?;
|
||||
Ok(df_plan)
|
||||
}
|
||||
|
||||
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error> {
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
let substrait_plan = to_substrait_plan(&plan).context(EncodeDfPlanSnafu)?;
|
||||
substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
|
||||
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
}
|
||||
@@ -109,18 +109,6 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to encode DataFusion plan, source: {}", source))]
|
||||
EncodeDfPlan {
|
||||
source: datafusion::error::DataFusionError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode DataFusion plan, source: {}", source))]
|
||||
DecodeDfPlan {
|
||||
source: datafusion::error::DataFusionError,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -141,10 +129,7 @@ impl ErrorExt for Error {
|
||||
| Error::InvalidParameters { .. }
|
||||
| Error::TableNotFound { .. }
|
||||
| Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
|
||||
Error::DFInternal { .. }
|
||||
| Error::Internal { .. }
|
||||
| Error::EncodeDfPlan { .. }
|
||||
| Error::DecodeDfPlan { .. } => StatusCode::Internal,
|
||||
Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal,
|
||||
Error::ConvertDfSchema { source } => source.status_code(),
|
||||
Error::ResolveTable { source, .. } => source.status_code(),
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
mod context;
|
||||
mod df_expr;
|
||||
mod df_logical;
|
||||
mod df_substrait;
|
||||
pub mod error;
|
||||
mod schema;
|
||||
mod types;
|
||||
@@ -27,7 +26,7 @@ use async_trait::async_trait;
|
||||
use bytes::{Buf, Bytes};
|
||||
use catalog::CatalogManagerRef;
|
||||
|
||||
pub use crate::df_substrait::DFLogicalSubstraitConvertor;
|
||||
pub use crate::df_logical::DFLogicalSubstraitConvertor;
|
||||
|
||||
#[async_trait]
|
||||
pub trait SubstraitPlan {
|
||||
|
||||
@@ -14,11 +14,14 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_datasource::error::Error as DataSourceError;
|
||||
use common_error::prelude::*;
|
||||
use common_procedure::ProcedureId;
|
||||
use datafusion::parquet;
|
||||
use snafu::Location;
|
||||
use storage::error::Error as StorageError;
|
||||
use table::error::Error as TableError;
|
||||
use url::ParseError;
|
||||
|
||||
use crate::datanode::ObjectStoreConfig;
|
||||
|
||||
@@ -205,6 +208,18 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build backend, source: {}", source))]
|
||||
BuildBackend {
|
||||
#[snafu(backtrace)]
|
||||
source: DataSourceError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse url, source: {}", source))]
|
||||
ParseUrl {
|
||||
source: DataSourceError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Runtime resource error, source: {}", source))]
|
||||
RuntimeResource {
|
||||
#[snafu(backtrace)]
|
||||
@@ -214,6 +229,36 @@ pub enum Error {
|
||||
#[snafu(display("Invalid SQL, error: {}", msg))]
|
||||
InvalidSql { msg: String },
|
||||
|
||||
#[snafu(display("Invalid url: {}, error :{}", url, source))]
|
||||
InvalidUrl { url: String, source: ParseError },
|
||||
|
||||
#[snafu(display("Invalid filepath: {}", path))]
|
||||
InvalidPath { path: String },
|
||||
|
||||
#[snafu(display("Invalid connection: {}", msg))]
|
||||
InvalidConnection { msg: String },
|
||||
|
||||
#[snafu(display("Unsupported backend protocol: {}", protocol))]
|
||||
UnsupportedBackendProtocol { protocol: String },
|
||||
|
||||
#[snafu(display("Failed to regex, source: {}", source))]
|
||||
BuildRegex {
|
||||
location: Location,
|
||||
source: regex::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list objects, source: {}", source))]
|
||||
ListObjects {
|
||||
#[snafu(backtrace)]
|
||||
source: DataSourceError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse the data, source: {}", source))]
|
||||
ParseDataTypes {
|
||||
#[snafu(backtrace)]
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Not support SQL, error: {}", msg))]
|
||||
NotSupportSql { msg: String },
|
||||
|
||||
@@ -364,6 +409,63 @@ pub enum Error {
|
||||
source: query::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to copy data from table: {}, source: {}", table_name, source))]
|
||||
CopyTable {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: TableError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute table scan, source: {}", source))]
|
||||
TableScanExec {
|
||||
#[snafu(backtrace)]
|
||||
source: common_query::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"File schema mismatch at index {}, expected table schema: {} but found: {}",
|
||||
index,
|
||||
table_schema,
|
||||
file_schema
|
||||
))]
|
||||
InvalidSchema {
|
||||
index: usize,
|
||||
table_schema: String,
|
||||
file_schema: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read parquet file, source: {}", source))]
|
||||
ReadParquet {
|
||||
source: parquet::errors::ParquetError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to poll stream, source: {}", source))]
|
||||
PollStream {
|
||||
source: datafusion_common::DataFusionError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build parquet record batch stream, source: {}", source))]
|
||||
BuildParquetRecordBatchStream {
|
||||
location: Location,
|
||||
source: parquet::errors::ParquetError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read object in path: {}, source: {}", path, source))]
|
||||
ReadObject {
|
||||
path: String,
|
||||
location: Location,
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to write object into path: {}, source: {}", path, source))]
|
||||
WriteObject {
|
||||
path: String,
|
||||
location: Location,
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unrecognized table option: {}", source))]
|
||||
UnrecognizedTableOption {
|
||||
#[snafu(backtrace)]
|
||||
@@ -407,6 +509,12 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to copy table to parquet file, source: {}", source))]
|
||||
WriteParquet {
|
||||
#[snafu(backtrace)]
|
||||
source: storage::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -446,6 +554,11 @@ impl ErrorExt for Error {
|
||||
|
||||
ColumnValuesNumberMismatch { .. }
|
||||
| InvalidSql { .. }
|
||||
| InvalidUrl { .. }
|
||||
| InvalidPath { .. }
|
||||
| InvalidConnection { .. }
|
||||
| UnsupportedBackendProtocol { .. }
|
||||
| BuildRegex { .. }
|
||||
| NotSupportSql { .. }
|
||||
| KeyColumnNotFound { .. }
|
||||
| IllegalPrimaryKeysDef { .. }
|
||||
@@ -459,7 +572,8 @@ impl ErrorExt for Error {
|
||||
| DatabaseNotFound { .. }
|
||||
| MissingNodeId { .. }
|
||||
| MissingMetasrvOpts { .. }
|
||||
| ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
|
||||
| ColumnNoneDefaultValue { .. }
|
||||
| ParseUrl { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
// TODO(yingwen): Further categorize http error.
|
||||
StartServer { .. }
|
||||
@@ -472,13 +586,22 @@ impl ErrorExt for Error {
|
||||
| RenameTable { .. }
|
||||
| Catalog { .. }
|
||||
| MissingRequiredField { .. }
|
||||
| BuildParquetRecordBatchStream { .. }
|
||||
| InvalidSchema { .. }
|
||||
| ParseDataTypes { .. }
|
||||
| IncorrectInternalState { .. }
|
||||
| ShutdownServer { .. }
|
||||
| ShutdownInstance { .. }
|
||||
| CloseTableEngine { .. } => StatusCode::Internal,
|
||||
|
||||
InitBackend { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
BuildBackend { .. }
|
||||
| InitBackend { .. }
|
||||
| ReadParquet { .. }
|
||||
| WriteParquet { .. }
|
||||
| PollStream { .. }
|
||||
| ReadObject { .. }
|
||||
| WriteObject { .. }
|
||||
| ListObjects { .. } => StatusCode::StorageUnavailable,
|
||||
OpenLogStore { source } => source.status_code(),
|
||||
OpenStorageEngine { source } => source.status_code(),
|
||||
RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
@@ -486,6 +609,8 @@ impl ErrorExt for Error {
|
||||
TableIdProviderNotFound { .. } => StatusCode::Unsupported,
|
||||
BumpTableId { source, .. } => source.status_code(),
|
||||
ColumnDefaultValue { source, .. } => source.status_code(),
|
||||
CopyTable { source, .. } => source.status_code(),
|
||||
TableScanExec { source, .. } => source.status_code(),
|
||||
UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
|
||||
RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => {
|
||||
source.status_code()
|
||||
|
||||
@@ -25,9 +25,10 @@ use query::query_engine::SqlStatementExecutor;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::prelude::*;
|
||||
use sql::ast::ObjectName;
|
||||
use sql::statements::copy::{CopyTable, CopyTableArgument};
|
||||
use sql::statements::statement::Statement;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::{CreateDatabaseRequest, DropTableRequest};
|
||||
use table::requests::{CopyDirection, CopyTableRequest, CreateDatabaseRequest, DropTableRequest};
|
||||
|
||||
use crate::error::{
|
||||
self, BumpTableIdSnafu, ExecuteSqlSnafu, ExecuteStatementSnafu, NotSupportSqlSnafu,
|
||||
@@ -104,6 +105,54 @@ impl Instance {
|
||||
.execute(SqlRequest::DropTable(req), query_ctx)
|
||||
.await
|
||||
}
|
||||
Statement::Copy(copy_table) => {
|
||||
let req = match copy_table {
|
||||
CopyTable::To(copy_table) => {
|
||||
let CopyTableArgument {
|
||||
location,
|
||||
connection,
|
||||
pattern,
|
||||
table_name,
|
||||
..
|
||||
} = copy_table;
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
table_idents_to_full_name(&table_name, query_ctx.clone())?;
|
||||
CopyTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
location,
|
||||
connection,
|
||||
pattern,
|
||||
direction: CopyDirection::Export,
|
||||
}
|
||||
}
|
||||
CopyTable::From(copy_table) => {
|
||||
let CopyTableArgument {
|
||||
location,
|
||||
connection,
|
||||
pattern,
|
||||
table_name,
|
||||
..
|
||||
} = copy_table;
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
table_idents_to_full_name(&table_name, query_ctx.clone())?;
|
||||
CopyTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
location,
|
||||
connection,
|
||||
pattern,
|
||||
direction: CopyDirection::Import,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.sql_handler
|
||||
.execute(SqlRequest::CopyTable(req), query_ctx)
|
||||
.await
|
||||
}
|
||||
_ => NotSupportSqlSnafu {
|
||||
msg: format!("not supported to execute {stmt:?}"),
|
||||
}
|
||||
|
||||
@@ -32,6 +32,8 @@ use crate::error::{
|
||||
use crate::instance::sql::table_idents_to_full_name;
|
||||
|
||||
mod alter;
|
||||
mod copy_table_from;
|
||||
mod copy_table_to;
|
||||
mod create;
|
||||
mod drop_table;
|
||||
mod flush_table;
|
||||
@@ -44,6 +46,7 @@ pub enum SqlRequest {
|
||||
Alter(AlterTableRequest),
|
||||
DropTable(DropTableRequest),
|
||||
FlushTable(FlushTableRequest),
|
||||
CopyTable(CopyTableRequest),
|
||||
}
|
||||
|
||||
// Handler to execute SQL except query
|
||||
@@ -82,6 +85,10 @@ impl SqlHandler {
|
||||
SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await,
|
||||
SqlRequest::Alter(req) => self.alter(req).await,
|
||||
SqlRequest::DropTable(req) => self.drop_table(req).await,
|
||||
SqlRequest::CopyTable(req) => match req.direction {
|
||||
CopyDirection::Export => self.copy_table_to(req).await,
|
||||
CopyDirection::Import => self.copy_table_from(req).await,
|
||||
},
|
||||
SqlRequest::FlushTable(req) => self.flush_table(req).await,
|
||||
};
|
||||
if let Err(e) = &result {
|
||||
|
||||
@@ -21,6 +21,7 @@ use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::error::DataTypesSnafu;
|
||||
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
|
||||
use datatypes::arrow::datatypes::{DataType, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
@@ -31,10 +32,10 @@ use table::engine::TableReference;
|
||||
use table::requests::{CopyTableRequest, InsertRequest};
|
||||
use tokio::io::BufReader;
|
||||
|
||||
use crate::error::{self, IntoVectorsSnafu, Result};
|
||||
use crate::statement::StatementExecutor;
|
||||
use crate::error::{self, ParseDataTypesSnafu, Result};
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
impl StatementExecutor {
|
||||
impl SqlHandler {
|
||||
pub(crate) async fn copy_table_from(&self, req: CopyTableRequest) -> Result<Output> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
@@ -101,8 +102,9 @@ impl StatementExecutor {
|
||||
|
||||
while let Some(r) = stream.next().await {
|
||||
let record_batch = r.context(error::ReadParquetSnafu)?;
|
||||
let vectors =
|
||||
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
|
||||
let vectors = Helper::try_into_vectors(record_batch.columns())
|
||||
.context(DataTypesSnafu)
|
||||
.context(ParseDataTypesSnafu)?;
|
||||
|
||||
pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_datasource;
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_query::physical_plan::SessionContext;
|
||||
use common_query::Output;
|
||||
@@ -22,9 +23,9 @@ use table::engine::TableReference;
|
||||
use table::requests::CopyTableRequest;
|
||||
|
||||
use crate::error::{self, Result, WriteParquetSnafu};
|
||||
use crate::statement::StatementExecutor;
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
impl StatementExecutor {
|
||||
impl SqlHandler {
|
||||
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<Output> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
@@ -10,7 +10,6 @@ python = ["dep:script"]
|
||||
|
||||
[dependencies]
|
||||
api = { path = "../api" }
|
||||
async-compat = "0.2"
|
||||
async-stream.workspace = true
|
||||
async-trait = "0.1"
|
||||
catalog = { path = "../catalog" }
|
||||
@@ -18,7 +17,6 @@ chrono.workspace = true
|
||||
client = { path = "../client" }
|
||||
common-base = { path = "../common/base" }
|
||||
common-catalog = { path = "../common/catalog" }
|
||||
common-datasource = { path = "../common/datasource" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-function = { path = "../common/function" }
|
||||
common-grpc = { path = "../common/grpc" }
|
||||
@@ -38,12 +36,10 @@ itertools = "0.10"
|
||||
meta-client = { path = "../meta-client" }
|
||||
mito = { path = "../mito", features = ["test"] }
|
||||
moka = { version = "0.9", features = ["future"] }
|
||||
object-store = { path = "../object-store" }
|
||||
openmetrics-parser = "0.4"
|
||||
partition = { path = "../partition" }
|
||||
prost.workspace = true
|
||||
query = { path = "../query" }
|
||||
regex = "1.6"
|
||||
script = { path = "../script", features = ["python"], optional = true }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
@@ -51,7 +47,6 @@ servers = { path = "../servers" }
|
||||
session = { path = "../session" }
|
||||
snafu.workspace = true
|
||||
sql = { path = "../sql" }
|
||||
storage = { path = "../storage" }
|
||||
store-api = { path = "../store-api" }
|
||||
substrait = { path = "../common/substrait" }
|
||||
table = { path = "../table" }
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datafusion::parquet;
|
||||
use snafu::Location;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -311,8 +310,13 @@ pub enum Error {
|
||||
source: common_grpc_expr::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert into vectors, source: {}", source))]
|
||||
IntoVectors {
|
||||
#[snafu(display(
|
||||
"Failed to build default value, column: {}, source: {}",
|
||||
column,
|
||||
source
|
||||
))]
|
||||
ColumnDefaultValue {
|
||||
column: String,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
@@ -366,92 +370,6 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: script::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build regex, source: {}", source))]
|
||||
BuildRegex {
|
||||
location: Location,
|
||||
source: regex::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to copy table: {}, source: {}", table_name, source))]
|
||||
CopyTable {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to insert value into table: {}, source: {}",
|
||||
table_name,
|
||||
source
|
||||
))]
|
||||
Insert {
|
||||
table_name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute table scan, source: {}", source))]
|
||||
TableScanExec {
|
||||
#[snafu(backtrace)]
|
||||
source: common_query::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse data source url, source: {}", source))]
|
||||
ParseUrl {
|
||||
#[snafu(backtrace)]
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build data source backend, source: {}", source))]
|
||||
BuildBackend {
|
||||
#[snafu(backtrace)]
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list objects, source: {}", source))]
|
||||
ListObjects {
|
||||
#[snafu(backtrace)]
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read object in path: {}, source: {}", path, source))]
|
||||
ReadObject {
|
||||
path: String,
|
||||
location: Location,
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read parquet file, source: {}", source))]
|
||||
ReadParquet {
|
||||
source: parquet::errors::ParquetError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build parquet record batch stream, source: {}", source))]
|
||||
BuildParquetRecordBatchStream {
|
||||
location: Location,
|
||||
source: parquet::errors::ParquetError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to write parquet file, source: {}", source))]
|
||||
WriteParquet {
|
||||
#[snafu(backtrace)]
|
||||
source: storage::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Schema datatypes not match at index {}, expected table schema: {}, actual file schema: {}",
|
||||
index,
|
||||
table_schema,
|
||||
file_schema
|
||||
))]
|
||||
InvalidSchema {
|
||||
index: usize,
|
||||
table_schema: String,
|
||||
file_schema: String,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -470,9 +388,7 @@ impl ErrorExt for Error {
|
||||
| Error::MissingInsertValues { .. }
|
||||
| Error::PrimaryKeyNotFound { .. }
|
||||
| Error::MissingMetasrvOpts { .. }
|
||||
| Error::ColumnNoneDefaultValue { .. }
|
||||
| Error::BuildRegex { .. }
|
||||
| Error::InvalidSchema { .. } => StatusCode::InvalidArguments,
|
||||
| Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::NotSupported { .. } => StatusCode::Unsupported,
|
||||
|
||||
@@ -485,13 +401,10 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::ParseSql { source } => source.status_code(),
|
||||
|
||||
Error::Table { source }
|
||||
| Error::CopyTable { source, .. }
|
||||
| Error::Insert { source, .. } => source.status_code(),
|
||||
Error::Table { source } => source.status_code(),
|
||||
|
||||
Error::ConvertColumnDefaultConstraint { source, .. }
|
||||
| Error::CreateTableInfo { source }
|
||||
| Error::IntoVectors { source } => source.status_code(),
|
||||
| Error::CreateTableInfo { source } => source.status_code(),
|
||||
|
||||
Error::RequestDatanode { source } => source.status_code(),
|
||||
|
||||
@@ -534,6 +447,7 @@ impl ErrorExt for Error {
|
||||
Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
|
||||
Error::EncodeSubstraitLogicalPlan { source } => source.status_code(),
|
||||
Error::InvokeDatanode { source } => source.status_code(),
|
||||
Error::ColumnDefaultValue { source, .. } => source.status_code(),
|
||||
|
||||
Error::External { source } => source.status_code(),
|
||||
Error::DeserializePartition { source, .. } | Error::FindTableRoute { source, .. } => {
|
||||
@@ -542,18 +456,6 @@ impl ErrorExt for Error {
|
||||
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::StartScriptManager { source } => source.status_code(),
|
||||
|
||||
Error::TableScanExec { source, .. } => source.status_code(),
|
||||
|
||||
Error::ReadObject { .. }
|
||||
| Error::ReadParquet { .. }
|
||||
| Error::BuildParquetRecordBatchStream { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
Error::ListObjects { source }
|
||||
| Error::ParseUrl { source }
|
||||
| Error::BuildBackend { source } => source.status_code(),
|
||||
|
||||
Error::WriteParquet { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,31 +12,23 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod copy_table_from;
|
||||
mod copy_table_to;
|
||||
mod describe;
|
||||
mod show;
|
||||
mod tql;
|
||||
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datanode::instance::sql::table_idents_to_full_name;
|
||||
use query::parser::QueryStatement;
|
||||
use query::query_engine::SqlStatementExecutorRef;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::copy::{CopyTable, CopyTableArgument};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sql::statements::statement::Statement;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::{CopyDirection, CopyTableRequest};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu,
|
||||
Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
|
||||
CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, PlanStatementSnafu, Result,
|
||||
SchemaNotFoundSnafu,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -92,20 +84,13 @@ impl StatementExecutor {
|
||||
|
||||
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx),
|
||||
|
||||
Statement::Copy(stmt) => {
|
||||
let req = to_copy_table_request(stmt, query_ctx)?;
|
||||
match req.direction {
|
||||
CopyDirection::Export => self.copy_table_to(req).await,
|
||||
CopyDirection::Import => self.copy_table_from(req).await,
|
||||
}
|
||||
}
|
||||
|
||||
Statement::CreateDatabase(_)
|
||||
| Statement::CreateTable(_)
|
||||
| Statement::CreateExternalTable(_)
|
||||
| Statement::Insert(_)
|
||||
| Statement::Alter(_)
|
||||
| Statement::DropTable(_)
|
||||
| Statement::Copy(_)
|
||||
| Statement::ShowCreateTable(_) => self
|
||||
.sql_stmt_executor
|
||||
.execute_sql(stmt, query_ctx)
|
||||
@@ -114,7 +99,11 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
pub(crate) async fn plan_exec(
|
||||
&self,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let planner = self.query_engine.planner();
|
||||
let plan = planner
|
||||
.plan(stmt, query_ctx.clone())
|
||||
@@ -140,50 +129,4 @@ impl StatementExecutor {
|
||||
|
||||
Ok(Output::RecordBatches(RecordBatches::empty()))
|
||||
}
|
||||
|
||||
async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
|
||||
let TableReference {
|
||||
catalog,
|
||||
schema,
|
||||
table,
|
||||
} = table_ref;
|
||||
self.catalog_manager
|
||||
.table(catalog, schema, table)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
|
||||
let direction = match stmt {
|
||||
CopyTable::To(_) => CopyDirection::Export,
|
||||
CopyTable::From(_) => CopyDirection::Import,
|
||||
};
|
||||
|
||||
let CopyTableArgument {
|
||||
location,
|
||||
connection,
|
||||
pattern,
|
||||
table_name,
|
||||
..
|
||||
} = match stmt {
|
||||
CopyTable::To(arg) => arg,
|
||||
CopyTable::From(arg) => arg,
|
||||
};
|
||||
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
Ok(CopyTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
location,
|
||||
connection,
|
||||
pattern,
|
||||
direction,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -770,8 +770,7 @@ mod test {
|
||||
options: SortOptions::default(),
|
||||
}],
|
||||
Arc::new(merge),
|
||||
)
|
||||
.with_fetch(None);
|
||||
);
|
||||
assert_eq!(sort.output_partitioning().partition_count(), 1);
|
||||
|
||||
let session_ctx = SessionContext::new();
|
||||
|
||||
@@ -765,7 +765,7 @@ async fn test_delete(instance: Arc<dyn MockInstance>) {
|
||||
check_output_stream(output, expect).await;
|
||||
}
|
||||
|
||||
#[apply(both_instances_cases)]
|
||||
#[apply(standalone_instance_case)]
|
||||
async fn test_execute_copy_to_s3(instance: Arc<dyn MockInstance>) {
|
||||
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
|
||||
if !bucket.is_empty() {
|
||||
@@ -803,7 +803,7 @@ async fn test_execute_copy_to_s3(instance: Arc<dyn MockInstance>) {
|
||||
}
|
||||
}
|
||||
|
||||
#[apply(both_instances_cases)]
|
||||
#[apply(standalone_instance_case)]
|
||||
async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
|
||||
logging::init_default_ut_logging();
|
||||
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
|
||||
|
||||
@@ -54,7 +54,7 @@ async fn create_insert_query_assert(
|
||||
|
||||
let query_output = instance
|
||||
.statement_executor()
|
||||
.execute_stmt(QueryStatement::Promql(eval_stmt), QueryContext::arc())
|
||||
.plan_exec(QueryStatement::Promql(eval_stmt), QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
check_unordered_output_stream(query_output, expected).await;
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.1.0
|
||||
v0.2.1
|
||||
|
||||
@@ -23,7 +23,7 @@ use snafu::ResultExt;
|
||||
use crate::error::{BuildHttpResponseSnafu, Result};
|
||||
|
||||
#[derive(RustEmbed)]
|
||||
#[folder = "dashboard/"]
|
||||
#[folder = "dashboard/dist/"]
|
||||
pub struct Assets;
|
||||
|
||||
pub(crate) fn dashboard() -> Router {
|
||||
|
||||
@@ -75,22 +75,18 @@ impl QueryContext {
|
||||
|
||||
pub fn set_current_schema(&self, schema: &str) {
|
||||
let last = self.current_schema.swap(Arc::new(schema.to_string()));
|
||||
if schema != last.as_str() {
|
||||
debug!(
|
||||
"set new session default schema: {:?}, swap old: {:?}",
|
||||
schema, last
|
||||
)
|
||||
}
|
||||
debug!(
|
||||
"set new session default schema: {:?}, swap old: {:?}",
|
||||
schema, last
|
||||
)
|
||||
}
|
||||
|
||||
pub fn set_current_catalog(&self, catalog: &str) {
|
||||
let last = self.current_catalog.swap(Arc::new(catalog.to_string()));
|
||||
if catalog != last.as_str() {
|
||||
debug!(
|
||||
"set new session default catalog: {:?}, swap old: {:?}",
|
||||
catalog, last
|
||||
)
|
||||
}
|
||||
debug!(
|
||||
"set new session default catalog: {:?}, swap old: {:?}",
|
||||
catalog, last
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user