Starting from Spoon’s main, kettleEnvironment.init () initializes the kettle environment and loads the Step

ExecutorService executor = Executors.newCachedThreadPool(); Future<KettleException> pluginRegistryFuture = executor.submit( new Callable<KettleException>() { @Override public KettleException call() throws Exception { registerUIPluginObjectTypes(); KettleClientEnvironment.getInstance().setClient( KettleClientEnvironment.ClientType.SPOON ); try { KettleEnvironment.init(); } catch ( KettleException e ) { return e; } return null; }});Copy the code

Multithreading technology related

The thread pool

Executors. NewCachedThreadPool () to use this way to create a thread pool, the number of threads are not fixed. A task queue is a SynchronousQueue in which a task is placed until it is consumed by the thread pool.

Threads concurrent

Enter the kettleEnvironment.init method

public static void init( boolean simpleJndi ) throws KettleException {
    init( Arrays.asList(
      RowDistributionPluginType.getInstance(),
      StepPluginType.getInstance(),
      StepDialogFragmentType.getInstance(),
      PartitionerPluginType.getInstance(),
      JobEntryPluginType.getInstance(),
      JobEntryDialogFragmentType.getInstance(),
      LogTablePluginType.getInstance(),
      RepositoryPluginType.getInstance(),
      LifecyclePluginType.getInstance(),
      KettleLifecyclePluginType.getInstance(),
      ImportRulePluginType.getInstance(),
      CartePluginType.getInstance(),
      CompressionPluginType.getInstance(),
      AuthenticationProviderPluginType.getInstance(),
      AuthenticationConsumerPluginType.getInstance(),
      EnginePluginType.getInstance()
    ), simpleJndi );
  }
Copy the code

Initialize a set of classes that define plug-in types

  • RowDistributionPluginType.getInstance()
public static RowDistributionPluginType getInstance() {
  if ( pluginType == null ) {
    pluginType = new RowDistributionPluginType();
  }
  return pluginType;
}
Copy the code

In the singleton mode, double-checked locking and delayed initialization are used to cause problems with instruction reordering. One solution is to use class initialization solutions. The JVM performs Class initialization during the Class initialization phase (after the Class is loaded and before it is used by a thread). When performing class initialization, the JVM acquires a lock that can synchronize multiple threads initializing the same class. Become Initialzation On Demand Holder idiom

  • initialized.compareAndSet( null, ready = SettableFuture.create() )

Enter the init method and use cas to determine that only one thread is calling the call method and that the other competing threads block until the initialization of the initializer thread completes. Initialized is a variable of the AtomicReference type, which can be used in other cases. Votaile cannot have memory visibility for reading and writing multiple variables, using locking mode, or AtomicReference type variables can also be implemented.

Enter the following code

pluginClasses.forEach( PluginRegistry::addPluginType );
       PluginRegistry.init();
Copy the code

The above code will take KettleEnvironment. The init method RowDistributionPluginType… PluginTypes pluginregistry.init () method call

. for ( final PluginTypeInterface pluginType : pluginTypes ) { log.snap( Metrics.METRIC_PLUGIN_REGISTRY_PLUGIN_TYPE_REGISTRATION_START, pluginType.getName() ); registry.registerType( pluginType ); log.snap( Metrics.METRIC_PLUGIN_REGISTRY_PLUGIN_TYPE_REGISTRATION_STOP, pluginType.getName() ); } log.snap( Metrics.METRIC_PLUGIN_REGISTRY_PLUGIN_REGISTRATION_STOP ); .Copy the code

When pluginType is StepPluginType, registry. RegisterType (pluginType) is the method we care about. The final call

public void searchPlugins() throws KettlePluginException {
  registerNatives();
  registerPluginJars();
  registerXmlPlugins();
}
Copy the code

There are two ways to extend Step

Built-in Step loading

RegisterNatives source code is as follows

protected void registerNatives() throws KettlePluginException { // Scan the native steps... // StepPluginType.getXmlPluginFile() xmlFile ="kettle-steps.xml" String xmlFile = getXmlPluginFile(); String alternative = null; if ( ! Utils.isEmpty( getAlternativePluginFile() ) ) { alternative = getPropertyExternal( getAlternativePluginFile(), null ); if ( ! Utils.isEmpty( alternative ) ) { xmlFile = alternative; } } // Load the plugins for this file... // InputStream inputStream = null; try { inputStream = getResAsStreamExternal( xmlFile ); if ( inputStream == null ) { inputStream = getResAsStreamExternal( "/" + xmlFile ); } if ( ! Utils.isEmpty( getAlternativePluginFile() ) ) { // Retry to load a regular file... if ( inputStream == null && ! Utils.isEmpty( alternative ) ) { try { inputStream = getFileInputStreamExternal( xmlFile ); } catch ( Exception e ) { throw new KettlePluginException( "Unable to load native plugins '" + xmlFile + "'", e ); } } } if ( inputStream == null ) { if ( isReturn() ) { return; } else { throw new KettlePluginException( "Unable to find native plugins definition file: " + xmlFile ); } } // This method registers plugins from the InputStream with the XML Resource registerPlugins( inputStream ); } catch ( KettleXMLException e ) { throw new KettlePluginException( "Unable to read the kettle XML config file: " + xmlFile, e ); } finally { IOUtils.closeQuietly( inputStream ); }}Copy the code

registerPlugins

protected void registerPlugins( InputStream inputStream ) throws KettlePluginException, KettleXMLException {// Parse XML Document Document = xmlHandler. loadXMLFile(inputStream, null, true, false); Node repsNode = XMLHandler.getSubNode( document, getMainTag() ); List<Node> repsNodes = XMLHandler.getNodes( repsNode, getSubTag() ); for ( Node repNode : RepsNodes) {// Parsed data, Instantiation PluginInterface, registered to PluginRegistry registerPluginFromXmlResource (repNode, getPath (), enclosing getClass (), true, null); }}Copy the code

Step loading in plug-in mode

registerPluginJars();

protected void registerPluginJars() throws KettlePluginException { List<JarFileAnnotationPlugin> jarFilePlugins = findAnnotatedClassFiles( pluginType.getName() ); for ( JarFileAnnotationPlugin jarFilePlugin : jarFilePlugins ) { URLClassLoader urlClassLoader = createUrlClassLoader( jarFilePlugin.getJarFile(), getClass().getClassLoader() ); try { Class<? > clazz = urlClassLoader.loadClass( jarFilePlugin.getClassName() ); if ( clazz == null ) { throw new KettlePluginException( "Unable to load class: " + jarFilePlugin.getClassName() ); } List<String> libraries = Arrays.stream( urlClassLoader.getURLs() ) .map( URL::getFile ) .collect( Collectors.toList() ); Annotation annotation = clazz.getAnnotation( pluginType ); // Handle an annotated plugin handlePluginAnnotation( clazz, annotation, libraries, false, jarFilePlugin.getPluginFolder() ); } catch ( Exception e ) { // Ignore for now, don't know if it's even possible. LogChannel.GENERAL.logError( "Unexpected error registering jar plugin file: " + jarFilePlugin.getJarFile(), e ); } finally { if ( urlClassLoader ! = null && urlClassLoader instanceof KettleURLClassLoader ) { ( (KettleURLClassLoader) urlClassLoader ).closeClassLoader(); }}}}Copy the code

registerPluginJars

protected void registerPluginJars() throws KettlePluginException { List<JarFileAnnotationPlugin> jarFilePlugins = findAnnotatedClassFiles( pluginType.getName() ); for ( JarFileAnnotationPlugin jarFilePlugin : jarFilePlugins ) { URLClassLoader urlClassLoader = createUrlClassLoader( jarFilePlugin.getJarFile(), getClass().getClassLoader() ); try { Class<? > clazz = urlClassLoader.loadClass( jarFilePlugin.getClassName() ); if ( clazz == null ) { throw new KettlePluginException( "Unable to load class: " + jarFilePlugin.getClassName() ); } List<String> libraries = Arrays.stream( urlClassLoader.getURLs() ) .map( URL::getFile ) .collect( Collectors.toList() ); Annotation annotation = clazz.getAnnotation( pluginType ); // Parse jar classes with @step annotations, HandlePluginAnnotation (Clazz, Annotation, libraries, false, jarFilePlugin.getPluginFolder() ); } catch ( Exception e ) { // Ignore for now, don't know if it's even possible. LogChannel.GENERAL.logError( "Unexpected error registering jar plugin file: " + jarFilePlugin.getJarFile(), e ); } finally { if ( urlClassLoader ! = null && urlClassLoader instanceof KettleURLClassLoader ) { ( (KettleURLClassLoader) urlClassLoader ).closeClassLoader(); }}}}Copy the code

registerXmlPlugins();

Default is not implemented