sequence

This article focuses on storm’s submitTopology

Submit the Topology log instance

The 2018-10-08 17:32:55. 2870-738 the INFO [main] org. Apache. Storm. StormSubmitter: Generated ZooKeeper secret contentforMD5-digest: -8659577410336375158:-6351873438041855318 2018-10-08 17:32:55.893 INFO 2870 -- [main] org.apache.storm.utils.NimbusClient : Found leader nimbus : A391f7a04044:6627 2018-10-08 17:32:56. 2870-059 the INFO [main] o.a pache. Storm. The security. The auth. AuthUtils: Got AutoCreds [] the 2018-10-08 17:32:56. 073 INFO 2870 - [the main] org. Apache. Storm. The utils. NimbusClient: Found leader nimbus: a391f7a04044:6627 2018-10-08 17:32:56. 2870-123 the INFO [main] org. Apache. Storm. StormSubmitter: Uploading dependencies - jars... The 2018-10-08 17:32:56. 2870-125 the INFO [main] org. Apache. Storm. StormSubmitter: Uploading dependencies - artifacts... The 2018-10-08 17:32:56. 2870-125 the INFO [main] org. Apache. Storm. StormSubmitter: the Dependency Blob keys - jars: [] / artifacts: [] the 2018-10-08 17:32:56. 2870-149 the INFO [main] org. Apache. Storm. StormSubmitter: Uploading topology jar/TMP /storm-demo/target/storm-demo-0.0.1- snapshot.jar to assigned location: / data/nimbus/inbox/a3 ead82bb stormjar - 4-74-45 a3 - aca4-3 af2f1d23998. Jar 17:32:57 2018-10-08. 2870-105 the INFO [main] org.apache.storm.StormSubmitter : Successfully uploaded topology jar to assigned location: / data/nimbus/inbox/a3 ead82bb stormjar - 4-74-45 a3 - aca4-3 af2f1d23998. Jar 17:32:57 2018-10-08. 2870-106 the INFO [main] org.apache.storm.StormSubmitter : Submitting topology DemoTopologyin distributed mode with conf {"nimbus.seeds": ["192.168.99.100"]."storm.zookeeper.topology.auth.scheme":"digest"."topology.workers": 1,"storm.zookeeper.port": 2181,"nimbus.thrift.port": 6627,"storm.zookeeper.topology.auth.payload":"6351873438041855318-8659577410336375158: -"."storm.zookeeper.servers": ["192.168.99.100"]} 2018-10-08 17:32:58. 008 INFO 2870 - [the main] org. Apache. Storm. StormSubmitter: Finished date topology: DemoTopologyCopy the code
  • Here you can see the path of the upload here the nimbus of/data/nimbus/inbox/a3 ead82bb stormjar – 4-74-45 a3 – aca4-3 af2f1d23998. Jar

StormSubmitter

submitTopology

Storm – core – 1.1.0 – sources jar! /org/apache/storm/StormSubmitter.java

    public static void submitTopology(String name, Map stormConf, StormTopology topology)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopology(name, stormConf, topology, null, null);
    }

    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
             ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopologyAs(name, stormConf, topology, opts, progressListener, null);
    }

    public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
        if(! Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        stormConf.putAll(prepareZookeeperAuthentication(conf));

        validateConfs(conf, topology);

        Map<String,String> passedCreds = new HashMap<>();
        if(opts ! = null) { Credentials tmpCreds = opts.get_creds();if(tmpCreds ! = null) { passedCreds = tmpCreds.get_creds(); } } Map<String,String> fullCreds = populateCredentials(conf, passedCreds);if(! fullCreds.isEmpty()) {if (opts == null) {
                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
            }
            opts.set_creds(new Credentials(fullCreds));
        }
        try {
            if (localNimbus! =null) { LOG.info("Submitting topology " + name + " in local mode");
                if(opts! =null) {localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
                } else {
                    // this is for backwards compatibility
                    localNimbus.submitTopology(name, stormConf, topology);
                }
                LOG.info("Finished submitting topology: " +  name);
            } else {
                String serConf = JSONValue.toJSONString(stormConf);
                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
                    if (topologyNameExists(name, client)) {
                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                    }

                    // Dependency uploading only makes sense for distributed mode
                    List<String> jarsBlobKeys = Collections.emptyList();
                    List<String> artifactsBlobKeys;

                    DependencyUploader uploader = new DependencyUploader();
                    try {
                        uploader.init();

                        jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);

                        artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
                    } catch (Throwable e) {
                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster uploader.deleteBlobs(jarsBlobKeys); uploader.shutdown(); throw e; } try { setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys); submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { // remove uploaded jars blobs, not artifacts since they're shared across the cluster
                        // Note that we don't handle TException to delete jars blobs // because it's safer to leave some blobs instead of topology not running
                        uploader.deleteBlobs(jarsBlobKeys);
                        throw e;
                    } finally {
                        uploader.shutdown();
                    }
                }
            }
        } catch(TException e) {
            throw new RuntimeException(e);
        }
        invokeSubmitterHook(name, asUser, conf, topology);

    }

    private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
                                                       ProgressListener progressListener, String asUser, Map conf,
                                                       String serConf, NimbusClient client) throws TException {
        try {
            String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
            LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);

            if(opts ! = null) { client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts); }else {
                // this is for backwards compatibility
                client.getClient().submitTopology(name, jar, serConf, topology);
            }
            LOG.info("Finished submitting topology: {}", name);
        } catch (InvalidTopologyException e) {
            LOG.warn("Topology submission exception: {}", e.get_msg());
            throw e;
        } catch (AlreadyAliveException e) {
            LOG.warn("Topology already alive exception", e);
            throw e;
        }
    }

    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
        if (localJar == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }

        try {
            String uploadLocation = client.getClient().beginFileUpload();
            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
            BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);

            long totalSize = new File(localJar).length();
            if(listener ! = null) { listener.onStart(localJar, uploadLocation, totalSize);
            }

            long bytesUploaded = 0;
            while(true) {
                byte[] toSubmit = is.read();
                bytesUploaded += toSubmit.length;
                if(listener ! = null) { listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
                }

                if(toSubmit.length==0) break;
                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
            }
            client.getClient().finishFileUpload(uploadLocation);

            if(listener ! = null) { listener.onCompleted(localJar, uploadLocation, totalSize);
            }

            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
            returnuploadLocation; } catch(Exception e) { throw new RuntimeException(e); }}Copy the code
  • The topology is submitted primarily through the submitTopologyAs method
  • And submitTopologyAs call submitTopologyInDistributeMode by DependencyUploader upload dependent, and then through submitJarAs way to upload the topology of the jar package
  • The previous log as you can see, the path of the uploaded to the nimbus of/data/nimbus/inbox/a3 ead82bb stormjar – 4-74-45 a3 – aca4-3 af2f1d23998. Jar
  • Client.getclient (). SubmitTopology is about submitting topology information

uploadDependencyJarsToBlobStore

Storm – core – 1.1.0 – sources jar! /org/apache/storm/StormSubmitter.java

    private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
        LOG.info("Uploading dependencies - jars...");

        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();

        String depJarsProp = System.getProperty("storm.dependency.jars"."");
        List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);

        try {
            return uploader.uploadFiles(depJars, true); } catch (Throwable e) { throw new RuntimeException(e); }}Copy the code

uploadDependencyArtifactsToBlobStore

Storm – core – 1.1.0 – sources jar! /org/apache/storm/StormSubmitter.java

    private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
        LOG.info("Uploading dependencies - artifacts...");

        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();

        String depArtifactsProp = System.getProperty("storm.dependency.artifacts"."{}");
        Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);

        try {
            returnuploader.uploadArtifacts(depArtifacts); } catch (Throwable e) { throw new RuntimeException(e); }}Copy the code

DependencyUploader

Storm – core – 1.1.0 – sources jar! /org/apache/storm/dependency/DependencyUploader.java

    public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
        checkFilesExist(dependencies);

        List<String> keys = new ArrayList<>(dependencies.size());
        try {
            for (File dependency : dependencies) {
                String fileName = dependency.getName();
                String key = BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName));

                try {
                    uploadDependencyToBlobStore(key, dependency);
                } catch (KeyAlreadyExistsException e) {
                    // it should never happened since we apply UUID
                    throw new RuntimeException(e);
                }

                keys.add(key);
            }
        } catch (Throwable e) {
            if(getBlobStore() ! = null && cleanupIfFails) { deleteBlobs(keys); } throw new RuntimeException(e); }return keys;
    }

    public List<String> uploadArtifacts(Map<String, File> artifacts) {
        checkFilesExist(artifacts.values());

        List<String> keys = new ArrayList<>(artifacts.size());
        try {
            for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) {
                String artifact = artifactToFile.getKey();
                File dependency = artifactToFile.getValue();

                String key = BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
                try {
                    uploadDependencyToBlobStore(key, dependency);
                } catch (KeyAlreadyExistsException e) {
                    // we lose the race, but it doesn't matter } keys.add(key); } } catch (Throwable e) { throw new RuntimeException(e); } return keys; } private boolean uploadDependencyToBlobStore(String key, File dependency) throws KeyAlreadyExistsException, AuthorizationException, IOException { boolean uploadNew = false; try { // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved // as a workaround, we call getBlobMeta() for all keys getBlobStore().getBlobMeta(key); } catch (KeyNotFoundException e) { // TODO: do we want to add ACL here? AtomicOutputStream blob = getBlobStore() .createBlob(key, new SettableBlobMeta(new ArrayList
      
       ())); Files.copy(dependency.toPath(), blob); blob.close(); uploadNew = true; } return uploadNew; }
      Copy the code
  • Call uploadDependencyToBlobStore uploadFiles and uploadArtifacts method finally
  • To write data into the AtomicOutputStream uploadDependencyToBlobStore method

NimbusUploadAtomicOutputStream

Storm – core – 1.1.0 – sources jar! /org/apache/storm/blobstore/NimbusBlobStore.java

    public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
        private String session;
        private int maxChunkSize = 4096;
        private String key;

        public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) {
            this.session = session;
            this.maxChunkSize = bufferSize;
            this.key = key;
        }

        @Override
        public void cancel() throws IOException {
            try {
                synchronized(client) {
                    client.getClient().cancelBlobUpload(session);
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void write(int b) throws IOException {
            try {
                synchronized(client) {
                    client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b}));
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void write(byte []b) throws IOException {
            write(b, 0, b.length);
        }

        @Override
        public void write(byte []b, int offset, int len) throws IOException {
            try {
                int end = offset + len;
                for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) {
                    int realLen = Math.min(end - realOffset, maxChunkSize);
                    LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset)); synchronized(client) { client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen)); } } } catch (TException e) { throw new RuntimeException(e); } } @Override public void close() throws IOException { try { synchronized(client) { client.getClient().finishBlobUpload(session); client.getClient().createStateInZookeeper(key); } } catch (TException e) { throw new RuntimeException(e); }}}Copy the code
  • NimbusUploadAtomicOutputStream write method by the client. GetClient () uploadBlobChunk complete data upload

send&recv

Storm – core – 1.1.0 – sources jar! /org/apache/storm/generated/Nimbus.java

    public String beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
    {
      send_beginFileUpload();
      return recv_beginFileUpload();
    }

    public void send_beginFileUpload() throws org.apache.thrift.TException
    {
      beginFileUpload_args args = new beginFileUpload_args();
      sendBase("beginFileUpload", args);
    }

    public String recv_beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
    {
      beginFileUpload_result result = new beginFileUpload_result();
      receiveBase(result, "beginFileUpload");
      if (result.is_set_success()) {
        return result.success;
      }
      if(result.aze ! = null) { throw result.aze; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT,"beginFileUpload failed: unknown result");
    }

    public void send_finishFileUpload(String location) throws org.apache.thrift.TException
    {
      finishFileUpload_args args = new finishFileUpload_args();
      args.set_location(location);
      sendBase("finishFileUpload", args);
    }

    public void uploadChunk(String location, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
    {
      send_uploadChunk(location, chunk);
      recv_uploadChunk();
    }

    public void send_uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift.TException
    {
      uploadChunk_args args = new uploadChunk_args();
      args.set_location(location);
      args.set_chunk(chunk);
      sendBase("uploadChunk", args);
    }

    public void recv_uploadChunk() throws AuthorizationException, org.apache.thrift.TException
    {
      uploadChunk_result result = new uploadChunk_result();
      receiveBase(result, "uploadChunk");
      if(result.aze ! = null) { throw result.aze; }return;
    }

    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
    {
      send_submitTopology(name, uploadedJarLocation, jsonConf, topology);
      recv_submitTopology();
    }

    public void send_submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws org.apache.thrift.TException
    {
      submitTopology_args args = new submitTopology_args();
      args.set_name(name);
      args.set_uploadedJarLocation(uploadedJarLocation);
      args.set_jsonConf(jsonConf);
      args.set_topology(topology);
      sendBase("submitTopology", args);
    }

    public void recv_submitTopology() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
    {
      submitTopology_result result = new submitTopology_result();
      receiveBase(result, "submitTopology");
      if(result.e ! = null) { throw result.e; }if(result.ite ! = null) { throw result.ite; }if(result.aze ! = null) { throw result.aze; }return;
    }

    public void uploadBlobChunk(String session, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
    {
      send_uploadBlobChunk(session, chunk);
      recv_uploadBlobChunk();
    }

    public void send_uploadBlobChunk(String session, ByteBuffer chunk) throws org.apache.thrift.TException
    {
      uploadBlobChunk_args args = new uploadBlobChunk_args();
      args.set_session(session);
      args.set_chunk(chunk);
      sendBase("uploadBlobChunk", args);
    }

    public void recv_uploadBlobChunk() throws AuthorizationException, org.apache.thrift.TException
    {
      uploadBlobChunk_result result = new uploadBlobChunk_result();
      receiveBase(result, "uploadBlobChunk");
      if(result.aze ! = null) { throw result.aze; }return;
    }
Copy the code
  • SendBase is used to send data and receiveBase is used to receive data

summary

Storm submitTopology will upload storm. The dependency. Jars to specify dependent jar, upload storm. The dependency. Artifacts specify dependent, and then upload the specified jar package, They both send data using the remote method sendBase and receive data using receiveBase.

doc

  • Storm 1.1.0 released