Skip to content

This repository accompanies the article "Build a data ingestion pipeline using Kafka, Flink, and CrateDB" and the "CrateDB Community Day #2".

License

Notifications You must be signed in to change notification settings

crate/cratedb-flink-jobs

Repository files navigation

Example Flink jobs for "Apache Kafka, Apache Flink, and CrateDB" tutorial

About

Introduction

This repository supports related articles and publications around using Apache Flink and CrateDB, and is part of the Building industrial IoT applications with open-source components and CrateDB reference architecture series.

It is supplemented by a corresponding executable end-to-end tutorial for Apache Kafka, Apache Flink, and CrateDB, which easily provides a Kafka/Flink infrastructure on your workstation based on Docker Compose, pulls the Flink job JAR file from the assets on the release page, and submits it to the Flink cluster.

Resources

For learning more details about the technologies used here, please follow up reading the excellent documentation and resources around Apache Flink.

Details

Most of the Flink jobs demonstrated here connect to CrateDB using the Flink JDBC Connector, using both the vanilla PostgreSQL JDBC driver, and the CrateDB adapter/dialect.

The first two jobs, both defined in io.crate.flink.demo, can be launched as standalone Java applications, without the need to submit them to a Flink cluster. The other job, defined in io.crate.streaming, is meant to be submitted as a job to a Flink cluster for demonstration purposes, but can also be invoked interactively.

  • The SimpleJdbcSinkJob demonstrates a basic example which inserts a few records into CrateDB using Flink. It outlines how to use the fundamental Flink JdbcSink API, and how to adjust the corresponding JdbcExecutionOptions and JdbcConnectionOptions.
  • The SimpleTableApiJob demonstrates how to use the Flink Table API and the Flink DataStream API.

    The Flink Table API is a language-integrated query API for Java, Scala, and Python that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way, and offers a unified interface for both stream and batch processing.

    The Flink DataStream API offers the primitives of stream processing (namely time, state, and dataflow management) in a relatively low-level imperative programming API. The Table API abstracts away many internals and provides a structured and declarative API.

    Both APIs can work with bounded and unbounded streams. Bounded streams need to be managed when processing historical data. Unbounded streams occur in real-time processing scenarios that might be initialized with historical data first.

  • The TaxiRidesStreamingJob subscribes to an Apache Kafka topic as a data source, and stores received data into CrateDB as a data sink. The data stream is represented by records from the venerable NYC Yellow Taxi Trips dataset.

Usage

Prerequisites

Acquire and build the source code.

git clone https://github.com/crate/cratedb-flink-jobs
cd cratedb-flink-jobs
make build

TaxiRidesStreamingJob

  • make test will probe the job using flink info.
  • make submit will submit the job using flink run to a Flink cluster at localhost:8081 .

SimpleJdbcSinkJob and SimpleTableApiJob

  • make run JOB=SimpleJdbcSinkJob
  • make run JOB=SimpleTableApiJob

Appendix

JDBC drivers

  • SimpleJdbcSinkJob uses the PostgreSQL JDBC driver
    Driver class: org.postgresql.Driver
    URL schema: postgresql://
  • TaxiRidesStreamingJob and SimpleTableApiJob use the CrateDB JDBC driver
    Driver class: io.crate.client.jdbc.CrateDriver
    URL schema: crate://

These are the settings for the TaxiRidesStreamingJob.

Required settings

Setting Description
kafka.servers Comma-separated list of Kafka brokers to connect to.
kafka.topic Kafka topic to consume.
crate.hosts
Comma-separated list of CrateDB hosts. The format is <hostname>:<psql_port> [, ...].
Example: crate-01.example.net:5432,crate-02.example.net:5432
crate.table CrateDB table name.

Optional settings

Setting Default Description
kafka.group.id default Kafka consumer group ID.
kafka.offset earliest Kafka topic offset.
batch.interval.ms 5000 Timeout in milliseconds to use for periodic flushing.
crate.schema doc CrateDB schema.
crate.user crate CrateDB user.
crate.password <empty> CrateDB user password.