【 Abstract 】 SQOOP is a tool for efficiently transferring bulk data between Apache Hadoop and structured data stores (such as relational databases). This article will briefly describe the classes and methods associated with the execution of SQOOP jobs, combine this process with the execution of MapReduce, and analyze how data is migrated from source to destination.

SQOOP job execution process

Apart from the implementation of MR, there are five key classes that SQOOP uses during execution: Initializer, Partitioner, Extractor, Loader, Destroyer. The execution process is shown in the figure below

  • Initializer: initialization stage, source data verification, parameter initialization and other work;
  • Partitioner: Shard the source data to determine how many pieces to split according to the number of concurrent operations;
  • Extractor: opens the Extractor thread to construct the data write queue from memory according to the user’s configuration;
  • Loader: Start the Loader thread, read data from the queue and throw it;
  • Destroyer: Destroyer, which disconnects SQOOP from the data source and frees the resource;

Therefore, each time you create a new connector, you implement the above five classes.


Initializer is called before SQOOP tasks are submitted to MR, mainly for pre-migration preparation, such as connecting to data sources, creating temporary tables, adding dependent JARs, etc. It is the first step in the SQOOP job life cycle, and the main APIs are as follows

public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);

public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration){

  return new LinkedList<String>();


public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration) {

    return new NullSchema();


The getSchema() method is used by the Connector on the From or To side To match data when extracting or loading data. For example, a GenericJDBCConnector will call it to get the source MySQL database name, table name, field information in the table, etc.


Destroyer is instantiated at the end of the job execution, which is the last step in an SQOOP job. Clean up tasks, delete temporary tables, close connectors, and so on.

public abstract void destroy(DestroyerContext context,

LinkConfiguration LinkConfiguration, JobConfiguration JobConfiguration);


The Partitioner creates the Partition Partition. By default, SQOOP creates 10 shards. The main APIs are as follows

public abstract List<Partition> getPartitions(PartitionerContext context,

 LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration);

The Partition class implements the readFields() and write() methods for easy reading and writing

public abstract class Partition {

public abstract void readFields(DataInput in) throws IOException;

public abstract void write(DataOutput out) throws IOException;

public abstract String toString();



The Extractor class extracts data from the source side based on the shard partition and configuration information and writes it to SQOopMapDataWriter, an inner class of SQOopMapper that inherits the DataWriter class. In addition, it packages the SQOopWritable class to store data read from the source side in an intermediate data format.

public abstract void extract(ExtractorContext context,

                          LinkConfiguration linkConfiguration,
                          JobConfiguration jobConfiguration,
                          SqoopPartition partition);

The core code inside the method is as follows

while (resultSet.next()) { … context.getDataWriter().writeArrayRecord(array); . }


The loader accepts data from the source side and loads it into the destination side. It must implement the following interface

public abstract void load(LoaderContext context,

                       ConnectionConfiguration connectionConfiguration,
                       JobConfiguration jobConfiguration) throws Exception;

The load method from SqoopOutputFormatDataReader reads, it reads “representation” intermediate data format and loads it into the data source of data in _. In addition, the Loader must call dataReader () iteratively until it finishes reading.

while ((array = context.getDataReader().readArrayRecord()) ! = null) { … }

The MapReduce execution process

The previous section avoids the MR execution process and describes the migration process only from the Extractor and Loader processes. The following will describe an SQOOP migration job flow in detail with the implementation process of MR.

Initialize the

1) In the initial stage of the job, the process of slicing the data sent to the source side by SQoopInputFormat

  • The getSplits method of sqoopInputFormat calls the getPartitions method of the Partitioner class
  • Wrap the returned list of Partitions in SqoopSplit;
  • The default number of shards is 10

Here, each Partition is assigned to a Mapper for execution. Each Mapper starts an extractor thread and a Loader thread respectively to migrate the data.


2) Mapper process in the job execution stage

  • SQoopMapper contains an SQoopMapDataWriter class,
  • Mapper’s run() calls the Extractor. Extract method, which iteratively retrievesthe source data and then calls the DataWriter to write it into the Context

private Class SqoopMapDataWriter extends DataWriter {

. private void writeContent() { ... context.wirte(writable, NullWritable.get()); // The Writable here is an object of sqoopWritable... }...


Note: The Context here holds the kV pair, K is sqoopWritable, and V is just an empty Writable object. Write and ReadField are implemented in sqoopWritable for serialization and deserialization.


3) Reduce process in the job execution stage;

  • SqoopOutputFormatLoadExecutor packaging SqoopOuputFormatDataReader, SqoopRecordWriter ConsumerThread three inner classes;
  • SQOopNullOutputFormat calls GetRecordWriter to create a thread called ConsumerThread as follows

public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {

executorService = Executors.newSingleThreadExecutor(...) ; consumerFuture = executorService.submit(new ConsumerThread(context)); return writer;


  • ConsumerThread integrates the Runnable interface and calls loader.load (…) inside the thread. Method that iteratively reads sqoopWritable from the Context using DataReader and writes it to an intermediate data format to the destination database.

private class ConsumerThread implements Runnable {

. public void run() { ... Loader.load(loaderContext, connectorLinkConfig, ConnectorToJobConfig); . }...



  • In local mode, if SQOOP commits a task without setting SQOOPReducer. class, MR invokes a default reducer.class.
  • SetContent is SqoopRecordWriter. Write (…). , which deserializes sqoopWritable into an intermediate storage format, the IntermediateDataFormat. In contrast, getContent reads data from this intermediate storage format.
  • SQOOP defines a pluggable IntermediateDataFormat abstract class, the intermediatedDataFormat class, which SQOOP Writable packages to hold intermediate data.

The above is the SQOOP job execution related classes and methods content, I hope you in the process of data migration help.

Click on the attention, the first time to understand Huawei cloud fresh technology ~