Livy profile

In addition to spark-submit, Spark-shell, and Thrift Server provided by Apache Spark, Apache Livy provides another way to interact with the Spark cluster through the REST interface

In addition,Apache Livy supports simultaneous maintenance of multiple sessions, which can be accessed through the REST interface, Java/Scala libraries, and Apache Zeppelin

Livy basic architecture

Livy is a typical REST service architecture. On the one hand, it receives and parses REST requests from users and converts them into corresponding operations. On the other hand, it manages all Spark clusters started by users

You can start a session using Livy in REST request mode. A session consists of a Spark cluster, and the Spark cluster communicates with the Livy server through RPC. Depending on how interactions are handled, Livy divides sessions into two types:

1. Interactive session: Similar to Spark, after a session is started, code fragments submitted by users are sent to the remote Spark cluster for compilation and execution

2. Batch Session: Users can start Spark applications in batch mode using LIVY. This mode is called a batch session in LiVY and is the same as the batch session in Spark

Livy deployment

Enter livy.apache.org/download/ download Apache Livy (note that making a Cloudera Livy), these are two different Livy, pay attention to distinguish, download after a successful execution unzip the decompression

1. Download and decompress the package

Unzip wget https://www.apache.org/dyn/closer.lua/incubator/livy/0.6.0-incubating/apache-livy-0.6.0-incubating-bin.zip Apache - livy - 0.6.0 - incubating - bin. Zip mv apache - livy - 0.6.0 - incubating - bin/usr /local/livy
Copy the code

2. The configuration

cd /usr/local/livy/conf
cp livy-env.sh.template livy-env.sh
vi livy-env.sh
      export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop/conf
      export SPARK_HOME=$SPARK_HOME
      
cp livy.conf.template livy.conf
vi livy.conf 
           livy.spark.master=yarn-client # Use yarn mode
           livy.spark.deploy-mode = client
           livy.repl.enable-hive-context = true
           livy.server.session.kind=yarn
           livy.server.recovery.mode = recovery  # Recovery mechanism
           livy.server.recovery.state-store = zookeeper / filesystem There are two save mechanisms, one is file, the other is ZooKeeper
           livy.server.recovery.state-store.url = zookeeper_host:port
           livy.file.upload.max.size=107059609600 # livy File upload limit, default limit is 100 MB, Jar package is larger than 100 MB, raise Too large request or file
Copy the code

Other optional configurations are:

livy.server.host The default host address is 0.0.0.0.
livy.server.port The default port number is 8998.
livy.server.session.timeout-check # whether to detect session timeout. Default is true.
livy.server.session.timeout Session timeout duration (default: 1h)
livy.server.session.state-retain.sec Session retention time (default: 600s)
livy.rsc.jars # RSC JAR package location, cached in HDFS, can speed up the session start speed;
livy.repl.jars # REPL JAR package location, cached in HDFS, can speed up the start of the session
livy.server.yarn.poll-interval # Refresh frequency of YARN state. The default value is 5s.
livy.ui.enabled # whether to enable the UI interface, default is true;
Copy the code

3. Start and stop the service

cd /usr/local/livy/bin
./livy-server start # start service
./livy-server stop  # stop service
Copy the code

4. Access the interface

Visit the page http://bigdatatest-1:8999The default port is 8998, and the livy service port is 8999 when Ambari is used to deploy the cluster
Copy the code

Use the Postman test tool to learn about the Livy interface

Livy official document:Livy.apache.org/docs/latest…

1. New session

post http://bigdatatest-1:8999/sessions
Copy the code

2. Check the session status

GET http://bigdatatest-1:8999/sessions
Copy the code

3. Submit code snippets

Note: POST http://bigdatatest-1:8999/sessions/130/statements If the submitted code snippet is shared within the same session, equivalent to sesssions/ 130:130 in the same Spark shell. This number is the ID value obtained when the new session is started (the result of the first step)Copy the code

4. View the code execution result

GET http://bigdatatest-1:8999/sessions/0/statements/0 note: statements / 0:0 is behind the id value returned by the step 3 to submit code snippetsCopy the code

5. To kill the session

delete http://bigdatatest-1:8999/sessions/130
Copy the code

6. Submit the Spark batch task

POST http://bigdatatest-1:8999/batches
Copy the code

Note: For the SparkPi example used in the example, the jar package path is under $SPARK_HOME/examples/jars/

7. View the Spark task result

GET http://bigdatatest-1:8999/batches
Copy the code

Livy Java API

Using the Java API requires only the following steps:

1. Add maven dependencies

2. Implement the org.apache.livy.Job interface

3. Build LivyClient based on URL addresses

4. Upload the dependent JAR package —-> Notice If the jar package file is too large, an exception will be thrown

5. Submit the Livy task and get the results

6. Close the client

1. Add mavan dependencies

<dependency> <groupId>org.apache.livy</groupId> <artifactId>livy-client-http</artifactId> < version > 0.5.0 - incubating < / version > < / dependency >Copy the code

2. Implement the Job interface

package com.ud.livy;

import org.apache.livy.Job;
import org.apache.livy.JobContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/** * * CreateTime:2019/5/31 10:50 * Descriptor: */
public class TestJob implements Job {
    private static Logger logger = LoggerFactory.getLogger(TestJob.class);

    @Override
    public  List<String>  call(JobContext jobContext) throws Exception {
        JavaRDD<String> stringJavaRDD = jobContext.sc().parallelize(Arrays.asList("hello"."are"."you"."spark"."java"."java"."spark"."hadoop"));
        logger.info("Sample data :" + stringJavaRDD.first());
        JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                returninteger + integer2; }}); List<Tuple2<String, Integer>> collect = stringIntegerJavaPairRDD.collect(); List<String> list =new ArrayList<>();
        for (Tuple2<String,Integer> result:collect){
            list.add("key:"+ result._1+" value:"+result._2);
        }

        returnlist; }}Copy the code

3. Build the Client, upload the Jar package, and submit the job to obtain the result

package com.ud.livy;

import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.util.List;

/** * * CreateTime:2019/5/31 10:57 * Descriptor: */
public class LivyMain {
    private static Logger logger = LoggerFactory.getLogger(LivyMain.class);

    public static void main(String[] args) {
        LivyClient client = null;
        try {
            client = new LivyClientBuilder()
                    .setURI(new URI("http://bigdatatest-1:8999"))
                    .setConf("spark.debug.maxToStringFields"."10000000")
                    .setConf("spark.sql.parquet.binaryAsString"."true")
                    .setConf("spark.sql.parquet.writeLegacyForma"."true")
                    .setConf("spark.serializer"."org.apache.spark.serializer.KryoSerializer")
                    .setConf("spark.kryoserializer.buffer"."256")
                    .setConf("spark.yarn.queue"."llap")
                    .setConf("spark.app.name"."sparkTestSpark")
                    .setConf("spark.driver.memory"."2g")
                    .setConf("spark.executor.memory"."5g")
                    .setConf("spark.executor.instances"."3")
                    .setConf("spark.executor.cores"."4")
                    .setConf("spark.default.parallelism"."500")
                    .build();

            UploadJar () uploadJar() uploadJar() uploadJar() uploadJar() uploadJar() uploadJar() uploadJar() uploadJar() uploadJar(
            client.addJar(new URI("HDFS: / / 020 / bigdata bigdatatest - 8 - backtrack - final - 0.0.1 - the SNAPSHOT. Jar"));
/ / client. UploadJar (new File (" D: \ \ GitCode \ \ livyTest \ \ target \ \ bigdata backtrack - final - 0.0.1 - the SNAPSHOT. Jar ")), get ();
            List<String> results = ( List<String>) client.submit(new TestJob()).get();
            System.out.println(results);
        } catch (Exception e) {
                logger.error("Throw exception :",e);
        } finally {
                if(client! =null){
                    client.stop(true); }}}}Copy the code

4. Results presentation