Flink Development Platform: Apache Zeppelin — (Part 2). Batch

Jeff Zhang
6 min readMar 30, 2020

In Flink on Zeppelin (Part 1), I talked about how to setup flink on Zeppelin and run wordcount. In this article, I will talk about how to do batch in flink on Zeppelin via Flink SQL + UDF. Actually most of batch problems we meet can be done via Flink SQL + UDF. And it is pretty suitable to write UDF + SQL in Zeppelin which could provide interactive development user experience.

Overall, you can use Flink SQL + UDF in 2 main scenarios.

  • Batch ETL (Data cleaning and transformation)
  • Exploratory Data Analytics

Batch ETL

Now I will demonstrate how to do ETL on bank data via Flink SQL + UDF. (You can find the examples here in the Zeppelin’s built-in tutorial: Flink Tutorial/Batch ETL and Flink Tutorial/Exploratory Data Analytics)

First you need to download the bank raw data and preview this raw data via shell interpreter of Zeppelin. You can notice that the first row is the field names and afterwards are the actual data, each field is delimited by semicolon, and each field is around double quotes.

So the target of this ETL is to remove the quotes around each field and then use the first row as the table field names to create a structured table.

Step 1. Create a source table to represent the raw data.

We will use %flink.bsql to represent the following sql are batch sql which will be executed via BatchTableEnvironment.

You can write multiple sql statements in one paragraph, just separate them with semicolon.

Step 2. Create a sink table to represent the cleaned and structured data.

Step 3. Define Table Function to parse raw data.

Step 4. Use insert into statement to do Batch ETL (Read from the source table , do a simple processing then write to sink table)

Step 5. After Batch ETL job is completed, you can use select statement to preview the data in sink table to verify the ETL.

Exploratory Data Analytics

After the above ETL work is done, you have a clean and structured data. Now you can query the table to do exploratory data analytics. Of course, you can leverage Zeppelin’s dynamics for more interactivity. Here’re 3 examples

Flink UDF

SQL is a powerful language, but its expression capability is limited. Sometimes we need to leverage UDF to express more complicated logic. Flink interpreter in Zeppelin supports 2 kinds of Flink UDF (Scala & Python).

Here I will demonstrate 2 examples.

Scala UDF

%flinkclass ScalaUpper extends ScalarFunction {
def eval(str: String) = str.toUpperCase
}
btenv.registerFunction("scala_upper", new ScalaUpper())

Python UDF

In order to use python udf, you need to install pyflink via the following command

pip install apache-flink

For distributed mode (remote/yarn), you need to install it on all the nodes of flink cluster.

%flink.pyflinkclass PythonUpper(ScalarFunction):
def eval(self, s):
return s.upper()
bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

After you define these udfs, you can use them in Flink SQL.

Register UDF via flink.udf.jars

Although you can create UDF in Zeppelin, it would be inconvenient for users if there’re many udfs you need to register manually in Zeppelin or if the udf is pretty complicated (it is not suitable for write long and complicated code in Zeppelin). In this scenario, you can write your UDF in IDE and then just specify flink.udf.jars to point to this jar, Zeppelin will detect all the udfs in these jars and register them for you automatically (The udf name is just the class name).

For example, here I set flink.udf.jars to one jar which contains lots of flink udfs (https://github.com/zjffdu/flink-udf). Then if I run show functions, I can see all the udfs which are in this udf jar.

Connect to Hive

By default, the metadata of the tables created in flink is stored in memory. That means the metadata is lost if you restart your interpreter and you need to register tables again. Fortunately, flink has another metadata store (HiveCatalog) which would persistent metadata into hive metastore.

In order to use HiveCatalog in flink, you need to make the following configuration

  • Set zeppelin.flink.enableHive to be true
  • Set zeppelin.flink.hive.version to be the hive version you are using.
  • Set HIVE_CONF_DIR to be the location where hive-site.xml is located. Make sure hive metastore is started and you have configure hive.metastore.uris in hive-site.xml
  • Copy the following dependencies to the flink lib folder.
    a) flink-connector-hive_2.11–1.10.0.jar
    b) flink-hadoop-compatibility_2.11–1.10.0.jar
    c) hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)

Query tables in Hive

Besides querying the table which is created in flink, you can also query tables in existing hive installation.

Now I will use one simple example to demonstrate how to query hive table in flink interpreter.

Step 1. Show hive tables via command show tables . This command not only display the tables created in flink, but also display tables created in hive.

Step 2. Describe table schema in flink batch sql. employee is a table in hive. But you can still use describe command to get its schema in flink.

Step 3. Query hive table via flink sql.

Summary

This is how you can do batch in flink. Here’re the highlights of this article.

  • You can do most of the batch scenarios via SQL + UDF
  • Batch ETL
  • Exploratory data analytics
  • UDF (Scala, Python, flink.udf.jars)
  • Use HiveCatalog to persistent table metadata
  • Use HiveCatalog to query hive tables

Zeppelin community still try to improve and evolve the whole user experience of Flink on Zeppelin , you can join Zeppelin slack to discuss with community. http://zeppelin.apache.org/community.html#slack-channel

Besides this I also make a series of videos to show you how to do that, you can check them on this youtube link.

Flink on Zeppelin Series

References

--

--

Jeff Zhang

Apache Member, Open source veteran, Big Data, Data Science,