Flink Development Platform: Apache Zeppelin — (Part 2). Batch
In Flink on Zeppelin (Part 1), I talked about how to setup flink on Zeppelin and run wordcount. In this article, I will talk about how to do batch in flink on Zeppelin via Flink SQL + UDF. Actually most of batch problems we meet can be done via Flink SQL + UDF. And it is pretty suitable to write UDF + SQL in Zeppelin which could provide interactive development user experience.
Overall, you can use Flink SQL + UDF in 2 main scenarios.
- Batch ETL (Data cleaning and transformation)
- Exploratory Data Analytics
Batch ETL
Now I will demonstrate how to do ETL on bank data via Flink SQL + UDF. (You can find the examples here in the Zeppelin’s built-in tutorial: Flink Tutorial/Batch ETL and Flink Tutorial/Exploratory Data Analytics)
First you need to download the bank raw data and preview this raw data via shell interpreter of Zeppelin. You can notice that the first row is the field names and afterwards are the actual data, each field is delimited by semicolon, and each field is around double quotes.
So the target of this ETL is to remove the quotes around each field and then use the first row as the table field names to create a structured table.
Step 1. Create a source table to represent the raw data.
We will use %flink.bsql to represent the following sql are batch sql which will be executed via BatchTableEnvironment.
You can write multiple sql statements in one paragraph, just separate them with semicolon.
Step 2. Create a sink table to represent the cleaned and structured data.
Step 3. Define Table Function to parse raw data.
Step 4. Use insert into statement to do Batch ETL (Read from the source table , do a simple processing then write to sink table)
Step 5. After Batch ETL job is completed, you can use select statement to preview the data in sink table to verify the ETL.
Exploratory Data Analytics
After the above ETL work is done, you have a clean and structured data. Now you can query the table to do exploratory data analytics. Of course, you can leverage Zeppelin’s dynamics for more interactivity. Here’re 3 examples
Flink UDF
SQL is a powerful language, but its expression capability is limited. Sometimes we need to leverage UDF to express more complicated logic. Flink interpreter in Zeppelin supports 2 kinds of Flink UDF (Scala & Python).
Here I will demonstrate 2 examples.
Scala UDF
%flinkclass ScalaUpper extends ScalarFunction {
def eval(str: String) = str.toUpperCase
}btenv.registerFunction("scala_upper", new ScalaUpper())
Python UDF
In order to use python udf, you need to install pyflink via the following command
pip install apache-flink
For distributed mode (remote/yarn), you need to install it on all the nodes of flink cluster.
%flink.pyflinkclass PythonUpper(ScalarFunction):
def eval(self, s):
return s.upper()bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
After you define these udfs, you can use them in Flink SQL.
Register UDF via flink.udf.jars
Although you can create UDF in Zeppelin, it would be inconvenient for users if there’re many udfs you need to register manually in Zeppelin or if the udf is pretty complicated (it is not suitable for write long and complicated code in Zeppelin). In this scenario, you can write your UDF in IDE and then just specify flink.udf.jars to point to this jar, Zeppelin will detect all the udfs in these jars and register them for you automatically (The udf name is just the class name).
For example, here I set flink.udf.jars to one jar which contains lots of flink udfs (https://github.com/zjffdu/flink-udf). Then if I run show functions
, I can see all the udfs which are in this udf jar.
Connect to Hive
By default, the metadata of the tables created in flink is stored in memory. That means the metadata is lost if you restart your interpreter and you need to register tables again. Fortunately, flink has another metadata store (HiveCatalog) which would persistent metadata into hive metastore.
In order to use HiveCatalog in flink, you need to make the following configuration
- Set zeppelin.flink.enableHive to be true
- Set zeppelin.flink.hive.version to be the hive version you are using.
- Set HIVE_CONF_DIR to be the location where
hive-site.xml
is located. Make sure hive metastore is started and you have configurehive.metastore.uris
inhive-site.xml
- Copy the following dependencies to the flink lib folder.
a) flink-connector-hive_2.11–1.10.0.jar
b) flink-hadoop-compatibility_2.11–1.10.0.jar
c) hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)
Query tables in Hive
Besides querying the table which is created in flink, you can also query tables in existing hive installation.
Now I will use one simple example to demonstrate how to query hive table in flink interpreter.
Step 1. Show hive tables via command show tables
. This command not only display the tables created in flink, but also display tables created in hive.
Step 2. Describe table schema in flink batch sql. employee
is a table in hive. But you can still use describe command to get its schema in flink.
Step 3. Query hive table via flink sql.
Summary
This is how you can do batch in flink. Here’re the highlights of this article.
- You can do most of the batch scenarios via SQL + UDF
- Batch ETL
- Exploratory data analytics
- UDF (Scala, Python, flink.udf.jars)
- Use HiveCatalog to persistent table metadata
- Use HiveCatalog to query hive tables
Zeppelin community still try to improve and evolve the whole user experience of Flink on Zeppelin , you can join Zeppelin slack to discuss with community. http://zeppelin.apache.org/community.html#slack-channel
Besides this I also make a series of videos to show you how to do that, you can check them on this youtube link.
Flink on Zeppelin Series
- Flink Development Platform: Apache Zeppelin — (Part 1). Get Started
- Flink Development Platform: Apache Zeppelin — (Part 2). Batch
- Flink Development Platform: Apache Zeppelin — (Part 3). Streaming
- Flink Development Platform: Apache Zeppelin — (Part 4). Advanced Usage
References
- http://zeppelin.apache.org/
- Flink on Zeppelin videos
- https://zjffdu.gitbook.io/flink-on-zeppelin/
- https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/
- https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/systemFunctions.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/udfs.html