diff --git a/wasmedge-kafka-mysql/.docker/docker-compose.yml b/wasmedge-kafka-mysql/.docker/docker-compose.yml new file mode 100644 index 0000000..a8decca --- /dev/null +++ b/wasmedge-kafka-mysql/.docker/docker-compose.yml @@ -0,0 +1,37 @@ +services: + redpanda: + image: docker.redpanda.com/vectorized/redpanda:v22.2.2 + command: + - redpanda start + - --smp 1 + - --overprovisioned + - --node-id 0 + - --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092 + - --pandaproxy-addr 0.0.0.0:8082 + - --advertise-pandaproxy-addr localhost:8082 + ports: + - 8081:8081 + - 8082:8082 + - 9092:9092 + - 9644:9644 + - 29092:29092 + volumes: + - ./kafka:/app + etl: + image: etl-kafka + build: + context: etl + platforms: + - wasi/wasm32 + environment: + DATABASE_URL: mysql://root:whalehello@db:3306/mysql + KAFKA_URL: kafka://redpanda:9092/order + RUST_BACKTRACE: full + RUST_LOG: info + restart: unless-stopped + runtime: io.containerd.wasmedge.v1 + db: + image: mariadb:10.9 + environment: + MYSQL_ROOT_PASSWORD: whalehello diff --git a/wasmedge-kafka-mysql/README.md b/wasmedge-kafka-mysql/README.md new file mode 100644 index 0000000..8fe059e --- /dev/null +++ b/wasmedge-kafka-mysql/README.md @@ -0,0 +1,115 @@ +# Compose sample application + +we demonstrate a microservice written in Rust. It subscribes to a Kafka queue topic on a Redpanda server, and then transforms and saves each message into a MySQL database (mariaDB) table. The microservice is compiled into WebAssembly (Wasm) and runs in the WasmEdge Runtime, which is a secure and lightweight alternative to natively compiled Rust apps in Linux containers. + +## Use with Docker Development Environments + +You will need a version of Docker Desktop or Docker CLI with Wasm support. + +* [Install Docker Desktop + Wasm (Beta)](https://docs.docker.com/desktop/wasm/) +* [Install Docker CLI + Wasm](https://github.com/chris-crone/wasm-day-na-22/tree/main/server) + +## WasmEdge server with Redpanda and MySQL database + +Project structure: + +``` +. ++-- compose.yml +|-- etl + |-- Dockerfile + |-- Cargo.toml + +-- src + |-- main.rs +|-- kafka + |-- order.json +|-- db + |-- db-password.txt +``` + +The [compose.yml](compose.yml) is as follows. + +```yaml +services: + redpanda: + image: docker.redpanda.com/vectorized/redpanda:v22.2.2 + command: + - redpanda start + - --smp 1 + - --overprovisioned + - --node-id 0 + - --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092 + - --pandaproxy-addr 0.0.0.0:8082 + - --advertise-pandaproxy-addr localhost:8082 + ports: + - 8081:8081 + - 8082:8082 + - 9092:9092 + - 9644:9644 + - 29092:29092 + volumes: + - ./kafka:/app + + etl: + image: etl-kafka + build: + context: etl + platforms: + - wasi/wasm32 + environment: + DATABASE_URL: mysql://root:whalehello@db:3306/mysql + KAFKA_URL: kafka://redpanda:9092/order + RUST_BACKTRACE: full + RUST_LOG: info + restart: unless-stopped + runtime: io.containerd.wasmedge.v1 + + db: + image: mariadb:10.9 + environment: + MYSQL_ROOT_PASSWORD: whalehello +``` + +The compose file defines an application with three services `redpanda`, `etl` and `db`. The `redpanda` is a Kafka-compatible messaging server that produces messages in a queue topic. The `etl` service, in the WasmEdge container, subscribes to the queue topic and receives incoming messages. Each incoming message is parsed and stored in the `db` MySQL database server. + +## Deploy with docker compose + +```bash +$ docker compose up -d +... + ⠿ Network wasmedge-kafka-mysql_default Created 0.1s + ⠿ Container wasmedge-kafka-mysql-redpanda-1 Created 0.3s + ⠿ Container wasmedge-kafka-mysql-etl-1 Created 0.3s + ⠿ Container wasmedge-kafka-mysql-db-1 Created 0.3s +``` + +## Expected result + +```bash +$ docker compose ps +NAME COMMAND SERVICE STATUS PORTS +wasmedge-kafka-mysql-db-1 "docker-entrypoint.s…" db running 3306/tcp +wasmedge-kafka-mysql-etl-1 "kafka.wasm" etl running +wasmedge-kafka-mysql-redpanda-1 "/entrypoint.sh 'red…" redpanda running 0.0.0.0:8081-8082->8081-8082/tcp, :::8081-8082->8081-8082/tcp, 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9644->9644/tcp, :::9644->9644/tcp, 0.0.0.0:29092->29092/tcp, :::29092->29092/tcp +``` + +After the application starts, +log into the Redpanda container and send a message to the queue topic `order` as follows. + +```bash +$ docker compose exec redpanda /bin/bash +redpanda@1add2615774b:/$ cd /app +redpanda@1add2615774b:/app$ cat order.json | rpk topic produce order +Produced to partition 0 at offset 0 with timestamp 1667922788523. +``` + +To see the data in the database container, you can use the following commands. + +```bash +$ docker compose exec db /bin/bash +root@c97c472db02e:/# mysql -u root -pwhalehello mysql +mysql> select * from orders; +... ... +``` + diff --git a/wasmedge-kafka-mysql/compose.yml b/wasmedge-kafka-mysql/compose.yml new file mode 100644 index 0000000..a8decca --- /dev/null +++ b/wasmedge-kafka-mysql/compose.yml @@ -0,0 +1,37 @@ +services: + redpanda: + image: docker.redpanda.com/vectorized/redpanda:v22.2.2 + command: + - redpanda start + - --smp 1 + - --overprovisioned + - --node-id 0 + - --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092 + - --pandaproxy-addr 0.0.0.0:8082 + - --advertise-pandaproxy-addr localhost:8082 + ports: + - 8081:8081 + - 8082:8082 + - 9092:9092 + - 9644:9644 + - 29092:29092 + volumes: + - ./kafka:/app + etl: + image: etl-kafka + build: + context: etl + platforms: + - wasi/wasm32 + environment: + DATABASE_URL: mysql://root:whalehello@db:3306/mysql + KAFKA_URL: kafka://redpanda:9092/order + RUST_BACKTRACE: full + RUST_LOG: info + restart: unless-stopped + runtime: io.containerd.wasmedge.v1 + db: + image: mariadb:10.9 + environment: + MYSQL_ROOT_PASSWORD: whalehello diff --git a/wasmedge-kafka-mysql/db/db-password.txt b/wasmedge-kafka-mysql/db/db-password.txt new file mode 100644 index 0000000..21eee51 --- /dev/null +++ b/wasmedge-kafka-mysql/db/db-password.txt @@ -0,0 +1 @@ +whalehello diff --git a/wasmedge-kafka-mysql/etl/Cargo.toml b/wasmedge-kafka-mysql/etl/Cargo.toml new file mode 100644 index 0000000..657c1db --- /dev/null +++ b/wasmedge-kafka-mysql/etl/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "kafka" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.65" +mega_etl = {git = "https://github.com/second-state/MEGA.git"} +tokio_wasi = {version = '1.21', features = ["rt", "macros"]} +env_logger = "0.9" +log = "0.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +http_req_wasi = "0.10" +lazy_static = "1.4.0" diff --git a/wasmedge-kafka-mysql/etl/Dockerfile b/wasmedge-kafka-mysql/etl/Dockerfile new file mode 100644 index 0000000..081e9bd --- /dev/null +++ b/wasmedge-kafka-mysql/etl/Dockerfile @@ -0,0 +1,26 @@ +FROM --platform=$BUILDPLATFORM rust:1.64 AS buildbase +RUN <) -> TransformerResult> { + let s = std::str::from_utf8(&inbound_data) + .map_err(|e| TransformerError::Custom(e.to_string()))?; + let order: Order = serde_json::from_str(String::from(s).as_str()) + .map_err(|e| TransformerError::Custom(e.to_string()))?; + log::info!("{:?}", &order); + let mut ret = vec![]; + let sql_string = format!( + r"INSERT INTO orders VALUES ({:?}, {:?}, {:?}, {:?}, {:?}, {:?}, {:?}, CURRENT_TIMESTAMP);", + order.order_id, + order.product_id, + order.quantity, + order.amount, + order.shipping, + order.tax, + order.shipping_address, + ); + dbg!(sql_string.clone()); + ret.push(sql_string); + Ok(ret) + } + + async fn init() -> TransformerResult { + Ok(String::from( + r"CREATE TABLE IF NOT EXISTS orders (order_id INT, product_id INT, quantity INT, amount FLOAT, shipping FLOAT, tax FLOAT, shipping_address VARCHAR(50), date_registered TIMESTAMP DEFAULT CURRENT_TIMESTAMP);", + )) + } +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // can use builder later + let database_uri = std::env::var("DATABASE_URL")?; + let kafka_uri = std::env::var("KAFKA_URL")?; + let mut pipe = Pipe::new(database_uri, kafka_uri).await; + + // This is async because this calls the async transform() function in Order + pipe.start::().await?; + Ok(()) +} diff --git a/wasmedge-kafka-mysql/kafka/order.json b/wasmedge-kafka-mysql/kafka/order.json new file mode 100644 index 0000000..aeb1a38 --- /dev/null +++ b/wasmedge-kafka-mysql/kafka/order.json @@ -0,0 +1 @@ +{"order_id": 1,"product_id": 12,"quantity": 2,"amount": 56.0,"shipping": 15.0,"tax": 2.0,"shipping_address": "Mataderos 2312"}