Flink Development Platform: Apache Zeppelin — (Part 4). Advanced Usage

  • Dependency management
  • Job concurrency & parallelism
  • Sql job configuration
  • Run multiple insert sql as one flink job
  • Job isolation
  • Keep job alive even when Zeppelin shutdown
  • Multiple Hadoop & Hive
  • Inline Configuration
  • Zeppelin Rest API
  • How to debug/diagnose
  • Machine Learning (Alink)

Dependency management

Although you can use many languages (scala, python, sql) in Zeppelin, usually you need to import third party dependencies, such as kafka connector, your existing udf, specific libraries needed by your udf and etc. Overall, there’re 3 ways for dependency management.

  • flink.execution.jars All the jars will be loaded into flink interpreter’s classpath, and will be shipped to TM. It is used for specify any general jars that your code depends in your flink job.
  • flink.udf.jars It is very similar as flink.execution.jars, but Zeppelin will detect all the udf classes in these jars and register them for you automatically, the udf name is the class name.
  • flink.execution.packages It is also very similar as flink.execution.jars, but instead of specifying jar file, you just specify packages here. The artifact of this package and its transitive dependencies will be downloaded and put on flink interpreter’s classpath. e.g. if you would like to use kafka in streaming sql, you can configure flink.exection.packages as following
org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0

Job Concurrency & Parallelism

Not like flink’s sql-client where you have to run sql one by one, in Zeppelin you can run multiple sql jobs simultaneously. By default, you can run at most 10 sql jobs for both batch sql and streaming sql. There are 2 properties to configure the concurrency for batch sql and streaming sql

  • zeppelin.flink.concurrentBatchSql.max
  • zeppelin.flink.concurrentStreamSql.max

Sql Job Configuration

In Zeppelin, you are not only able to configure your flink session cluster, but also are able to configure each sql job. You can find all the available sql configuration here.

Run multiple insert sql as one flink job

In flink’s sql-client, one single insert into sql will trigger one flink job. In Zeppelin, it is the same behavior as sql-client by default. But sometimes, user would like to run multiple insert statement as one flink job. Fortunately, you can achieve that in Zeppelin via just one simple configuration: set paragraph local property runAsOne to be true. E.g. In the following screenshot, I have 2 insert statement, and I specify runAsOne to be true.

Job Isolation

Job isolation is pretty crucial for streaming job, you don’t want one streaming job affect another streaming job because usually streaming job failure will cause more lost than batch job. However, you will notice that all the streaming jobs you submitted in part-2, part-3 share the same flink session, that means one job failure may affect the others. Fortunately we can just make a few configurations to achieve job isolation in Zeppelin.

  • Change flink interpreter to be per note isolated. That means each note will launch a new flink session instead of sharing the same flink session.
  • Write just one sql job in each note. So that this sql job will run in its dedicated flink session.

Keep job alive even when Zeppelin shutdown

Besides job isolation, you also don’t want zeppelin affect your flink job. Even if Zeppelin is down, you may still want your flink job keep running (By default, flink job will be canceled and the flink cluster will be shutdown if it is yarn mode). Fortunately you can also achieve that by just a few configuration.

  • Set zeppelin.interpreter.close.cancel_job to be false
  • Set flink.interpreter.close.shutdown_cluster to be false

Multiple Hadoop & Hive

Sometimes users would have multiple hadoop cluster, e.g. one for dev another for production. You can create multiple flink interpreter, each with different HADOOP_CONF_DIR and HIVE_CONF_DIR. e.g. here I create another flink interpreter flink_dev which has different setting on HADOOP_CONF_DIR and HIVE_CONF_DIR. In notebook, you can just use %flink_dev, %flink_dev.bsql, %flink_dev.ssql to represent that you would like to use flink_dev .

Inline Configuration

Until now, you will notice that all the configurations are set in interpreter setting page. This is a global setting, would affect all the flink session clusters, most of time you would like set custom property for each flink session cluster. Fortunately, you can achieve this via inline configuration. Inline configuration is via a special generic configuration interpreter which could be used all the zeppelin interpreter. E.g. here I customize the execution mode and task manager memory via the inline configuration. To be noticed, the inline configuration must run before the flink session cluster is started, so most of time it is the first paragraph of one note.

Zeppelin Rest API

Besides the interactive approach of using Zeppelin, you can also use its rest api to submit flink job. Here’re 2 important approaches of running paragraph (submit flink job)

How to debug/diagnose

You can follow the following steps to debug and diagnose any issues in flink interpreter.

  • Step 1. Check the error message in Zeppelin frontend. Most of time you can find clues in the the error message of Zeppelin frontend.
  • Step 2. Take a look at the flink web ui if it is job failure issue.
  • Step 3. Check the flink interpeter log which is located under ZEPPELIN_HOME/logs/zeppelin-interpreter-flink-*.log
  • Step 4. Check the flink cluster log. (Check yarn app log if it is in yarn mode)

Machine Learning (Alink)

Alink is a machine learning framework based on flink. Zeppelin also integrates it so that you can do machine learning vi Alink in Zeppelin. Here’s one simple tutorial

pip install pyalink
mlenv = useCustomEnv(gateway, b_env, bt_env_2, s_env, st_env_2)

Flink on Zeppelin Series

Summary

  • Dependency management
  • Job concurrency & parallelism
  • Sql job configuration
  • Run multiple insert sql as one flink job
  • Job isolation
  • Keep job alive even when Zeppelin shutdown
  • Multiple Hadoop & Hive
  • Inline Configuration
  • Zeppelin Rest API
  • How to debug/diagnose
  • Machine Learning (Alink)

References

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Jeff Zhang

Jeff Zhang

Apache Member, Open source veteran, Big Data, Data Science,