feat!: Remove script crate and python feature (#5321)

* feat: exclude script crate

* chore: simplify feature

* feat: remove the script crate

* chore: remove python feature and some comments

* chore: fix warning
This commit is contained in:
Yingwen
2025-01-08 20:11:53 +08:00
committed by GitHub
parent 7f307a4cac
commit c19a56c79f
73 changed files with 85 additions and 14961 deletions

View File

@@ -130,7 +130,6 @@ mysql_async = { version = "0.33", default-features = false, features = [
] }
permutation = "0.4"
rand.workspace = true
script = { workspace = true, features = ["python"] }
serde_json.workspace = true
session = { workspace = true, features = ["testing"] }
table.workspace = true

View File

@@ -149,14 +149,6 @@ pub enum Error {
#[snafu(display("Failed to describe statement"))]
DescribeStatement { source: BoxedError },
#[snafu(display("Failed to insert script with name: {}", name))]
InsertScript {
name: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Pipeline management api error"))]
Pipeline {
#[snafu(source)]
@@ -165,14 +157,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to execute script by name: {}", name))]
ExecuteScript {
name: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Not supported: {}", feat))]
NotSupported { feat: String },
@@ -633,9 +617,7 @@ impl ErrorExt for Error {
CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
InsertScript { source, .. }
| ExecuteScript { source, .. }
| ExecuteQuery { source, .. }
ExecuteQuery { source, .. }
| ExecutePlan { source, .. }
| ExecuteGrpcQuery { source, .. }
| ExecuteGrpcRequest { source, .. }

View File

@@ -67,7 +67,7 @@ use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, LogQueryHandlerRef, OpenTelemetryProtocolHandlerRef,
OpentsdbProtocolHandlerRef, PipelineHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef,
OpentsdbProtocolHandlerRef, PipelineHandlerRef, PromStoreProtocolHandlerRef,
};
use crate::server::Server;
@@ -89,7 +89,6 @@ pub mod pprof;
pub mod prom_store;
pub mod prometheus;
pub mod result;
pub mod script;
mod timeout;
pub(crate) use timeout::DynamicTimeoutLayer;
@@ -464,7 +463,6 @@ impl From<JsonResponse> for HttpResponse {
#[derive(Clone)]
pub struct ApiState {
pub sql_handler: ServerSqlQueryHandlerRef,
pub script_handler: Option<ScriptHandlerRef>,
}
#[derive(Clone)]
@@ -490,15 +488,8 @@ impl HttpServerBuilder {
}
}
pub fn with_sql_handler(
self,
sql_handler: ServerSqlQueryHandlerRef,
script_handler: Option<ScriptHandlerRef>,
) -> Self {
let sql_router = HttpServer::route_sql(ApiState {
sql_handler,
script_handler,
});
pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
let sql_router = HttpServer::route_sql(ApiState { sql_handler });
Self {
router: self
@@ -783,8 +774,6 @@ impl HttpServer {
"/promql",
routing::get(handler::promql).post(handler::promql),
)
.route("/scripts", routing::post(script::scripts))
.route("/run-script", routing::post(script::run_script))
.with_state(api_state)
}
@@ -1065,7 +1054,7 @@ mod test {
let instance = Arc::new(DummyInstance { _tx: tx });
let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
let server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(sql_instance, None)
.with_sql_handler(sql_instance)
.build();
server.build(server.make_app()).route(
"/test/timeout",

View File

@@ -1,143 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use axum::extract::{Query, RawBody, State};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use snafu::ResultExt;
use crate::error::{HyperSnafu, InvalidUtf8ValueSnafu};
use crate::http::result::error_result::ErrorResponse;
use crate::http::{ApiState, GreptimedbV1Response, HttpResponse};
macro_rules! json_err {
($e: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error($e));
}};
($msg: expr, $code: expr) => {{
return HttpResponse::Error(ErrorResponse::from_error_message($code, $msg.to_string()));
}};
}
macro_rules! unwrap_or_json_err {
($result: expr) => {
match $result {
Ok(result) => result,
Err(e) => json_err!(e),
}
};
}
/// Handler to insert and compile script
#[axum_macros::debug_handler]
pub async fn scripts(
State(state): State<ApiState>,
Query(params): Query<ScriptQuery>,
RawBody(body): RawBody,
) -> HttpResponse {
if let Some(script_handler) = &state.script_handler {
let catalog = params
.catalog
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let schema = params.db.as_ref();
if schema.is_none() || schema.unwrap().is_empty() {
json_err!("invalid schema", StatusCode::InvalidArguments)
}
let name = params.name.as_ref();
if name.is_none() || name.unwrap().is_empty() {
json_err!("invalid name", StatusCode::InvalidArguments);
}
let bytes = unwrap_or_json_err!(hyper::body::to_bytes(body).await.context(HyperSnafu));
let script =
unwrap_or_json_err!(String::from_utf8(bytes.to_vec()).context(InvalidUtf8ValueSnafu));
// Safety: schema and name are already checked above.
let query_ctx = Arc::new(QueryContext::with(&catalog, schema.unwrap()));
match script_handler
.insert_script(query_ctx, name.unwrap(), &script)
.await
{
Ok(()) => GreptimedbV1Response::from_output(vec![]).await,
Err(e) => json_err!(
format!("Insert script error: {}", e.output_msg()),
e.status_code()
),
}
} else {
json_err!(
"Script execution not supported, missing script handler",
StatusCode::Unsupported
);
}
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct ScriptQuery {
pub catalog: Option<String>,
pub db: Option<String>,
pub name: Option<String>,
#[serde(flatten)]
pub params: HashMap<String, String>,
}
/// Handler to execute script
#[axum_macros::debug_handler]
pub async fn run_script(
State(state): State<ApiState>,
Query(params): Query<ScriptQuery>,
) -> HttpResponse {
if let Some(script_handler) = &state.script_handler {
let catalog = params
.catalog
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
let start = Instant::now();
let schema = params.db.as_ref();
if schema.is_none() || schema.unwrap().is_empty() {
json_err!("invalid schema", StatusCode::InvalidArguments)
}
let name = params.name.as_ref();
if name.is_none() || name.unwrap().is_empty() {
json_err!("invalid name", StatusCode::InvalidArguments);
}
// Safety: schema and name are already checked above.
let query_ctx = Arc::new(QueryContext::with(&catalog, schema.unwrap()));
let output = script_handler
.execute_script(query_ctx, name.unwrap(), params.params)
.await;
let resp = GreptimedbV1Response::from_output(vec![output]).await;
resp.with_execution_time(start.elapsed().as_millis() as u64)
} else {
json_err!(
"Script execution not supported, missing script handler",
StatusCode::Unsupported
);
}
}

View File

@@ -254,31 +254,6 @@ where
}
}
/// ScriptInterceptor can track life cycle of a script request and customize or
/// abort its execution at given point.
pub trait ScriptInterceptor {
type Error: ErrorExt;
/// Called before script request is actually executed.
fn pre_execute(&self, _name: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
Ok(())
}
}
pub type ScriptInterceptorRef<E> = Arc<dyn ScriptInterceptor<Error = E> + Send + Sync + 'static>;
impl<E: ErrorExt> ScriptInterceptor for Option<ScriptInterceptorRef<E>> {
type Error = E;
fn pre_execute(&self, name: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(name, query_ctx)
} else {
Ok(())
}
}
}
/// LineProtocolInterceptor can track life cycle of a line protocol request
/// and customize or abort its execution at given point.
#[async_trait]

View File

@@ -51,26 +51,9 @@ pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + S
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
#[async_trait]
pub trait ScriptHandler {
async fn insert_script(
&self,
query_ctx: QueryContextRef,
name: &str,
script: &str,
) -> Result<()>;
async fn execute_script(
&self,
query_ctx: QueryContextRef,
name: &str,
params: HashMap<String, String>,
) -> Result<Output>;
}
#[async_trait]
pub trait InfluxdbLineProtocolHandler {
/// A successful request will not return a response.

View File

@@ -14,38 +14,32 @@
use std::collections::HashMap;
use axum::body::{Body, Bytes};
use axum::extract::{Json, Query, RawBody, State};
use axum::extract::{Json, Query, State};
use axum::http::header;
use axum::response::IntoResponse;
use axum::Form;
use bytes::Bytes;
use headers::HeaderValue;
use http_body::combinators::UnsyncBoxBody;
use hyper::Response;
use mime_guess::mime;
use servers::http::GreptimeQueryOutput::Records;
use servers::http::{
handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState,
GreptimeQueryOutput, HttpResponse,
handler as http_handler, ApiState, GreptimeOptionsConfigState, GreptimeQueryOutput,
HttpResponse,
};
use servers::metrics_handler::MetricsHandler;
use session::context::QueryContext;
use table::test_util::MemTable;
use crate::{
create_testing_script_handler, create_testing_sql_query_handler, ScriptHandlerRef,
ServerSqlQueryHandlerRef,
};
use crate::create_testing_sql_query_handler;
#[tokio::test]
async fn test_sql_not_provided() {
let sql_handler = create_testing_sql_query_handler(MemTable::default_numbers_table());
let ctx = QueryContext::with_db_name(None);
ctx.set_current_user(auth::userinfo_by_name(None));
let api_state = ApiState {
sql_handler,
script_handler: None,
};
let api_state = ApiState { sql_handler };
for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] {
let query = http_handler::SqlQuery {
@@ -76,10 +70,7 @@ async fn test_sql_output_rows() {
let ctx = QueryContext::with_db_name(None);
ctx.set_current_user(auth::userinfo_by_name(None));
let api_state = ApiState {
sql_handler,
script_handler: None,
};
let api_state = ApiState { sql_handler };
let query_sql = "select sum(uint32s) from numbers limit 20";
for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] {
@@ -182,10 +173,7 @@ async fn test_dashboard_sql_limit() {
let sql_handler = create_testing_sql_query_handler(MemTable::specified_numbers_table(2000));
let ctx = QueryContext::with_db_name(None);
ctx.set_current_user(auth::userinfo_by_name(None));
let api_state = ApiState {
sql_handler,
script_handler: None,
};
let api_state = ApiState { sql_handler };
for format in ["greptimedb_v1", "csv", "table"] {
let query = create_query(format, "select * from numbers", Some(1000));
let sql_response = http_handler::sql(
@@ -228,10 +216,7 @@ async fn test_sql_form() {
let ctx = QueryContext::with_db_name(None);
ctx.set_current_user(auth::userinfo_by_name(None));
let api_state = ApiState {
sql_handler,
script_handler: None,
};
let api_state = ApiState { sql_handler };
for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] {
let form = create_form(format);
@@ -341,196 +326,6 @@ async fn test_metrics() {
assert!(text.contains("test_metrics counter"));
}
async fn insert_script(
script: String,
script_handler: ScriptHandlerRef,
sql_handler: ServerSqlQueryHandlerRef,
) {
let body = RawBody(Body::from(script.clone()));
let invalid_query = create_invalid_script_query();
let json = script_handler::scripts(
State(ApiState {
sql_handler: sql_handler.clone(),
script_handler: Some(script_handler.clone()),
}),
invalid_query,
body,
)
.await;
let HttpResponse::Error(json) = json else {
unreachable!()
};
assert_eq!(json.error(), "invalid schema");
let body = RawBody(Body::from(script.clone()));
let exec = create_script_query();
// Insert the script
let json = script_handler::scripts(
State(ApiState {
sql_handler: sql_handler.clone(),
script_handler: Some(script_handler.clone()),
}),
exec,
body,
)
.await;
let HttpResponse::GreptimedbV1(json) = json else {
unreachable!()
};
assert!(json.output().is_empty());
}
#[tokio::test]
async fn test_scripts() {
common_telemetry::init_default_ut_logging();
let script = r#"
@copr(sql='select uint32s as number from numbers limit 5', args=['number'], returns=['n'])
def test(n) -> vector[i64]:
return n;
"#
.to_string();
let sql_handler = create_testing_sql_query_handler(MemTable::default_numbers_table());
let script_handler = create_testing_script_handler(MemTable::default_numbers_table());
insert_script(script.clone(), script_handler.clone(), sql_handler.clone()).await;
// Run the script
let exec = create_script_query();
let json = script_handler::run_script(
State(ApiState {
sql_handler,
script_handler: Some(script_handler),
}),
exec,
)
.await;
let HttpResponse::GreptimedbV1(json) = json else {
unreachable!()
};
match &json.output()[0] {
GreptimeQueryOutput::Records(records) => {
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(5, records.num_rows());
assert_eq!(
json,
r#"{
"schema": {
"column_schemas": [
{
"name": "n",
"data_type": "Int64"
}
]
},
"rows": [
[
0
],
[
1
],
[
2
],
[
3
],
[
4
]
],
"total_rows": 5
}"#
);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_scripts_with_params() {
common_telemetry::init_default_ut_logging();
let script = r#"
@copr(sql='select uint32s as number from numbers limit 5', args=['number'], returns=['n'])
def test(n, **params) -> vector[i64]:
return n + int(params['a'])
"#
.to_string();
let sql_handler = create_testing_sql_query_handler(MemTable::default_numbers_table());
let script_handler = create_testing_script_handler(MemTable::default_numbers_table());
insert_script(script.clone(), script_handler.clone(), sql_handler.clone()).await;
// Run the script
let mut exec = create_script_query();
let _ = exec.0.params.insert("a".to_string(), "42".to_string());
let json = script_handler::run_script(
State(ApiState {
sql_handler,
script_handler: Some(script_handler),
}),
exec,
)
.await;
let HttpResponse::GreptimedbV1(json) = json else {
unreachable!()
};
match &json.output()[0] {
GreptimeQueryOutput::Records(records) => {
let json = serde_json::to_string_pretty(&records).unwrap();
assert_eq!(5, records.num_rows());
assert_eq!(
json,
r#"{
"schema": {
"column_schemas": [
{
"name": "n",
"data_type": "Int64"
}
]
},
"rows": [
[
42
],
[
43
],
[
44
],
[
45
],
[
46
]
],
"total_rows": 5
}"#
);
}
_ => unreachable!(),
}
}
fn create_script_query() -> Query<script_handler::ScriptQuery> {
Query(script_handler::ScriptQuery {
db: Some("test".to_string()),
name: Some("test".to_string()),
..Default::default()
})
}
fn create_invalid_script_query() -> Query<script_handler::ScriptQuery> {
Query(script_handler::ScriptQuery {
db: None,
name: None,
..Default::default()
})
}
fn create_query(format: &str, sql: &str, limit: Option<usize>) -> Query<http_handler::SqlQuery> {
Query(http_handler::SqlQuery {
sql: Some(sql.to_string()),

View File

@@ -117,7 +117,7 @@ fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>)
})
}
let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(instance.clone(), None)
.with_sql_handler(instance.clone())
.with_user_provider(Arc::new(user_provider))
.with_influxdb_handler(instance)
.build();

View File

@@ -109,7 +109,7 @@ fn make_test_app(tx: mpsc::Sender<String>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(instance.clone(), None)
.with_sql_handler(instance.clone())
.with_opentsdb_handler(instance)
.build();
server.build(server.make_app())

View File

@@ -138,7 +138,7 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let is_strict_mode = false;
let instance = Arc::new(DummyInstance { tx });
let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(instance.clone(), None)
.with_sql_handler(instance.clone())
.with_prom_handler(instance, true, is_strict_mode)
.build();
server.build(server.make_app())

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
@@ -25,12 +24,9 @@ use datafusion_expr::LogicalPlan;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
use script::engine::{CompileContext, EvalContext, Script, ScriptEngine};
use script::python::{PyEngine, PyScript};
use servers::error::{Error, NotSupportedSnafu, Result};
use servers::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerRef};
use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
use servers::query_handler::{ScriptHandler, ScriptHandlerRef};
use session::context::QueryContextRef;
use snafu::ensure;
use sql::statements::statement::Statement;
@@ -41,23 +37,16 @@ mod http;
mod interceptor;
mod mysql;
mod postgres;
mod py_script;
const LOCALHOST_WITH_0: &str = "127.0.0.1:0";
pub struct DummyInstance {
query_engine: QueryEngineRef,
py_engine: Arc<PyEngine>,
scripts: RwLock<HashMap<String, Arc<PyScript>>>,
}
impl DummyInstance {
fn new(query_engine: QueryEngineRef) -> Self {
Self {
py_engine: Arc::new(PyEngine::new(query_engine.clone())),
scripts: RwLock::new(HashMap::new()),
query_engine,
}
Self { query_engine }
}
}
@@ -113,51 +102,6 @@ impl SqlQueryHandler for DummyInstance {
}
}
#[async_trait]
impl ScriptHandler for DummyInstance {
async fn insert_script(
&self,
query_ctx: QueryContextRef,
name: &str,
script: &str,
) -> Result<()> {
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
let script = self
.py_engine
.compile(script, CompileContext::default())
.await
.unwrap();
script.register_udf().await;
let _ = self
.scripts
.write()
.unwrap()
.insert(format!("{catalog}_{schema}_{name}"), Arc::new(script));
Ok(())
}
async fn execute_script(
&self,
query_ctx: QueryContextRef,
name: &str,
params: HashMap<String, String>,
) -> Result<Output> {
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
let key = format!("{catalog}_{schema}_{name}");
let py_script = self.scripts.read().unwrap().get(&key).unwrap().clone();
Ok(py_script
.execute(params, EvalContext::default())
.await
.unwrap())
}
}
#[async_trait]
impl GrpcQueryHandler for DummyInstance {
type Error = Error;
@@ -219,10 +163,6 @@ fn create_testing_instance(table: TableRef) -> DummyInstance {
DummyInstance::new(query_engine)
}
fn create_testing_script_handler(table: TableRef) -> ScriptHandlerRef {
Arc::new(create_testing_instance(table)) as _
}
fn create_testing_sql_query_handler(table: TableRef) -> ServerSqlQueryHandlerRef {
Arc::new(create_testing_instance(table)) as _
}

View File

@@ -1,113 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_query::OutputData;
use common_recordbatch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, VectorRef};
use servers::error::Result;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::ScriptHandler;
use session::context::QueryContextBuilder;
use table::test_util::MemTable;
use crate::create_testing_instance;
#[ignore = "rust-python backend is not active support at present"]
#[tokio::test]
async fn test_insert_py_udf_and_query() -> Result<()> {
let catalog = "greptime";
let schema = "test";
let name = "hello";
let script = r#"
@copr(returns=['n'])
def hello() -> vector[str]:
return 'hello';
"#;
let column_schemas = vec![
ColumnSchema::new("script", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("schema", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
];
let columns: Vec<VectorRef> = vec![
Arc::new(StringVector::from(vec![script])),
Arc::new(StringVector::from(vec![schema])),
Arc::new(StringVector::from(vec![name])),
];
let raw_schema = Arc::new(Schema::new(column_schemas));
let recordbatch = RecordBatch::new(raw_schema, columns).unwrap();
let table = MemTable::table("scripts", recordbatch);
let query_ctx = Arc::new(
QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build(),
);
let instance = create_testing_instance(table);
instance
.insert_script(query_ctx.clone(), name, script)
.await?;
let output = instance
.execute_script(query_ctx.clone(), name, HashMap::new())
.await?;
match output.data {
OutputData::RecordBatches(batches) => {
let expected = "\
+-------+
| n |
+-------+
| hello |
+-------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
_ => unreachable!(),
}
let res = instance
.do_query("select hello()", query_ctx)
.await
.remove(0)
.unwrap();
match res.data {
OutputData::AffectedRows(_) => (),
OutputData::RecordBatches(_) => {
unreachable!()
}
OutputData::Stream(s) => {
let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
let expected = "\
+---------+
| hello() |
+---------+
| hello |
+---------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
}
Ok(())
}