Add the example for WasmEdge + Kafka / Redpanda + MySQL application to take messages from a queue and save into a database table.

Signed-off-by: Michael Yuan <michael@secondstate.io>
This commit is contained in:
Michael Yuan
2022-11-20 07:06:21 +00:00
parent 1250b05a8a
commit cb096c6289
8 changed files with 292 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
[package]
name = "kafka"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.65"
mega_etl = {git = "https://github.com/second-state/MEGA.git"}
tokio_wasi = {version = '1.21', features = ["rt", "macros"]}
env_logger = "0.9"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
http_req_wasi = "0.10"
lazy_static = "1.4.0"

View File

@@ -0,0 +1,26 @@
FROM --platform=$BUILDPLATFORM rust:1.64 AS buildbase
RUN <<EOT bash
set -ex
apt-get update
apt-get install -y \
git \
clang
rustup target add wasm32-wasi
EOT
# This line installs WasmEdge including the AOT compiler
RUN curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | bash
FROM buildbase AS build
COPY Cargo.toml .
COPY src ./src
# Build the Wasm binary
RUN --mount=type=cache,target=/usr/local/cargo/git/db \
--mount=type=cache,target=/usr/local/cargo/registry/cache \
--mount=type=cache,target=/usr/local/cargo/registry/index \
cargo build --target wasm32-wasi --release
# This line builds the AOT Wasm binary
RUN /root/.wasmedge/bin/wasmedgec target/wasm32-wasi/release/kafka.wasm kafka.wasm
FROM scratch
ENTRYPOINT [ "kafka.wasm" ]
COPY --link --from=build /kafka.wasm /kafka.wasm

View File

@@ -0,0 +1,58 @@
use mega_etl::{async_trait, Pipe, Transformer, TransformerError, TransformerResult};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Order {
order_id: i32,
product_id: i32,
quantity: i32,
amount: f32,
shipping: f32,
tax: f32,
shipping_address: String,
}
#[async_trait]
impl Transformer for Order {
async fn transform(inbound_data: &Vec<u8>) -> TransformerResult<Vec<String>> {
let s = std::str::from_utf8(&inbound_data)
.map_err(|e| TransformerError::Custom(e.to_string()))?;
let order: Order = serde_json::from_str(String::from(s).as_str())
.map_err(|e| TransformerError::Custom(e.to_string()))?;
log::info!("{:?}", &order);
let mut ret = vec![];
let sql_string = format!(
r"INSERT INTO orders VALUES ({:?}, {:?}, {:?}, {:?}, {:?}, {:?}, {:?}, CURRENT_TIMESTAMP);",
order.order_id,
order.product_id,
order.quantity,
order.amount,
order.shipping,
order.tax,
order.shipping_address,
);
dbg!(sql_string.clone());
ret.push(sql_string);
Ok(ret)
}
async fn init() -> TransformerResult<String> {
Ok(String::from(
r"CREATE TABLE IF NOT EXISTS orders (order_id INT, product_id INT, quantity INT, amount FLOAT, shipping FLOAT, tax FLOAT, shipping_address VARCHAR(50), date_registered TIMESTAMP DEFAULT CURRENT_TIMESTAMP);",
))
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
env_logger::init();
// can use builder later
let database_uri = std::env::var("DATABASE_URL")?;
let kafka_uri = std::env::var("KAFKA_URL")?;
let mut pipe = Pipe::new(database_uri, kafka_uri).await;
// This is async because this calls the async transform() function in Order
pipe.start::<Order>().await?;
Ok(())
}