Flink Development Platform: Apache Zeppelin — (Part 4). Advanced Usage
In this article, I would like to talk some advanced usage tips of flink on zeppelin. I believe you must want to know these as early as possible.
- Dependency management
- Job concurrency & parallelism
- Sql job configuration
- Run multiple insert sql as one flink job
- Job isolation
- Keep job alive even when Zeppelin shutdown
- Multiple Hadoop & Hive
- Inline Configuration
- Zeppelin Rest API
- How to debug/diagnose
- Machine Learning (Alink)
Dependency management
Although you can use many languages (scala, python, sql) in Zeppelin, usually you need to import third party dependencies, such as kafka connector, your existing udf, specific libraries needed by your udf and etc. Overall, there’re 3 ways for dependency management.
- flink.execution.jars All the jars will be loaded into flink interpreter’s classpath, and will be shipped to TM. It is used for specify any general jars that your code depends in your flink job.
- flink.udf.jars It is very similar as flink.execution.jars, but Zeppelin will detect all the udf classes in these jars and register them for you automatically, the udf name is the class name.
- flink.execution.packages It is also very similar as flink.execution.jars, but instead of specifying jar file, you just specify packages here. The artifact of this package and its transitive dependencies will be downloaded and put on flink interpreter’s classpath. e.g. if you would like to use kafka in streaming sql, you can configure flink.exection.packages as following
org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0
Job Concurrency & Parallelism
Not like flink’s sql-client where you have to run sql one by one, in Zeppelin you can run multiple sql jobs simultaneously. By default, you can run at most 10 sql jobs for both batch sql and streaming sql. There are 2 properties to configure the concurrency for batch sql and streaming sql
- zeppelin.flink.concurrentBatchSql.max
- zeppelin.flink.concurrentStreamSql.max
Besides specify concurrency, you can also specify parallelism of each sql via paragraph local property: parallelism
Here’s one example where set parallelism to be 3, so that every shuffle stage of this flink job will have 3 tasks.
Sql Job Configuration
In Zeppelin, you are not only able to configure your flink session cluster, but also are able to configure each sql job. You can find all the available sql configuration here.
In each sql paragraph (%flink.bsql & %flink.ssql), you can use set statement to configure the associated flink job. Here’s one example where I set table.exec.window-agg.buffer-size-limit (To be noticed, this set statement only affect current paragraph, that means the jobs in other paragraph would still use the default value of table.exec.window-agg.buffer-size-limit)
Run multiple insert sql as one flink job
In flink’s sql-client, one single insert into sql will trigger one flink job. In Zeppelin, it is the same behavior as sql-client by default. But sometimes, user would like to run multiple insert statement as one flink job. Fortunately, you can achieve that in Zeppelin via just one simple configuration: set paragraph local property runAsOne to be true. E.g. In the following screenshot, I have 2 insert statement, and I specify runAsOne to be true.
If I run this paragraph, it will generate only one flink job which would run both of these 2 sql (There’s one issue that the source in the 2 insert sql are not merged, it will be fixed in flink 1.11)
Job Isolation
Job isolation is pretty crucial for streaming job, you don’t want one streaming job affect another streaming job because usually streaming job failure will cause more lost than batch job. However, you will notice that all the streaming jobs you submitted in part-2, part-3 share the same flink session, that means one job failure may affect the others. Fortunately we can just make a few configurations to achieve job isolation in Zeppelin.
- Change flink interpreter to be per note isolated. That means each note will launch a new flink session instead of sharing the same flink session.
- Write just one sql job in each note. So that this sql job will run in its dedicated flink session.
So I would suggest you to run each streaming sql in individual flink session.
Keep job alive even when Zeppelin shutdown
Besides job isolation, you also don’t want zeppelin affect your flink job. Even if Zeppelin is down, you may still want your flink job keep running (By default, flink job will be canceled and the flink cluster will be shutdown if it is yarn mode). Fortunately you can also achieve that by just a few configuration.
- Set zeppelin.interpreter.close.cancel_job to be false
- Set flink.interpreter.close.shutdown_cluster to be false
Multiple Hadoop & Hive
Sometimes users would have multiple hadoop cluster, e.g. one for dev another for production. You can create multiple flink interpreter, each with different HADOOP_CONF_DIR and HIVE_CONF_DIR. e.g. here I create another flink interpreter flink_dev
which has different setting on HADOOP_CONF_DIR and HIVE_CONF_DIR. In notebook, you can just use %flink_dev, %flink_dev.bsql, %flink_dev.ssql to represent that you would like to use flink_dev
.
Inline Configuration
Until now, you will notice that all the configurations are set in interpreter setting page. This is a global setting, would affect all the flink session clusters, most of time you would like set custom property for each flink session cluster. Fortunately, you can achieve this via inline configuration. Inline configuration is via a special generic configuration interpreter which could be used all the zeppelin interpreter. E.g. here I customize the execution mode and task manager memory via the inline configuration. To be noticed, the inline configuration must run before the flink session cluster is started, so most of time it is the first paragraph of one note.
Zeppelin Rest API
Besides the interactive approach of using Zeppelin, you can also use its rest api to submit flink job. Here’re 2 important approaches of running paragraph (submit flink job)
For more details, you can refer this link.
How to debug/diagnose
You can follow the following steps to debug and diagnose any issues in flink interpreter.
- Step 1. Check the error message in Zeppelin frontend. Most of time you can find clues in the the error message of Zeppelin frontend.
- Step 2. Take a look at the flink web ui if it is job failure issue.
- Step 3. Check the flink interpeter log which is located under ZEPPELIN_HOME/logs/zeppelin-interpreter-flink-*.log
- Step 4. Check the flink cluster log. (Check yarn app log if it is in yarn mode)
Machine Learning (Alink)
Alink is a machine learning framework based on flink. Zeppelin also integrates it so that you can do machine learning vi Alink in Zeppelin. Here’s one simple tutorial
Step 1. Install pyalink
pip install pyalink
Step 2. Copy alink jars to flink lib folder. These alink jars are installed with pyalink, you can find them in the site-packages of alink install location. e.g. for me it is installed in /anaconda3/lib/python3.7/site-packages/pyalink/lib
Step 3. Run simple flink job to verify pyalink can work properly
The most important line of code here is
mlenv = useCustomEnv(gateway, b_env, bt_env_2, s_env, st_env_2)
mlenv is the entry point of Alink. Internally mlenv would use ExecutionEnvironemnt, StreamExecutionEnvironment, BatchTableEnvironment, StreamTableEnvironment. So here we need to use b_env, bt_env_2, s_env, st_env_2 to create mlenv. The reason here we use bt_env_2 and st_env_2 is because Alink is based on DataSet api which is only supported by flink planner (Alink need to convert DataSet to flink table internally).
After that, you can use whatever algorithm Alink provides, for more details, you can refer Alink’s official doc.
Flink on Zeppelin Series
- Flink Development Platform: Apache Zeppelin — (Part 1). Get Started
- Flink Development Platform: Apache Zeppelin — (Part 2). Batch
- Flink Development Platform: Apache Zeppelin — (Part 3). Streaming
- Flink Development Platform: Apache Zeppelin — (Part 4). Advanced Usage
Summary
- Dependency management
- Job concurrency & parallelism
- Sql job configuration
- Run multiple insert sql as one flink job
- Job isolation
- Keep job alive even when Zeppelin shutdown
- Multiple Hadoop & Hive
- Inline Configuration
- Zeppelin Rest API
- How to debug/diagnose
- Machine Learning (Alink)
Zeppelin community still try to improve and evolve the whole user experience of Flink on Zeppelin , you can join Zeppelin slack to discuss with community. http://zeppelin.apache.org/community.html#slack-channel
Besides this I also make a series of videos to show you how to do that, you can check them on this youtube link.