0 x00 the

In the previous article, we introduced the overall architecture of PipeDream, the Profile phase and the calculation partition phase. In this article we cover the model transformation phase.

Pipelining parallelism other articles are linked below:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

Deep learning pipeline parallel GPipe (2) —– gradient accumulation

Deep learning pipeline parallel GPipe(3) —- recalculation

Deep learning pipeline parallel PipeDream(1)– Profile stage

Deep learning pipeline parallel PipeDream(2)– computing partitions

0 x01 preface

1.1 improve

The model transformation phase is an improvement of PipeDream over GPipe, so let’s examine it.

  • The pipelining of GPipe (allocation of model-specific layers) can be understood as a pre-processing between dynamic and static before an application runs, and is not transparent to the user.
  • The model layer allocation of PipeDream is to package all layers of the same stage to generate a Pytorch model Python file according to the profile result, which also belongs to preprocessing. But it is undoubtedly more convenient and clear than GPipe, and users can also adjust it manually twice.

The basic idea of PipeDream model transformation is as follows:

  • Load the model DAG diagram into memory from the model file.
  • The graph is processed according to stage to separate the overall DAG graph. Since the layers of the model have been assigned to each Stage in the previous article, this Stage uses partition_graph to separate the layers contained in each Stage.
  • Apply template files to each subgraph of each stage, generating a Python file for each stage subgraph. In the main function, for each subgraph, convert it to a Pytorch Module, which corresponds to a Python file. That is, each layer is a submodule of this Module.
  • The fusion model merges the subgraphs of each Stage to generate the overall model file. In the previous section, we generated several Python files corresponding to a subgraph for each module. The purpose of this section is to merge these subgraphs into a large graph. Corresponding to python code, we generate a new Python file which imports the Python of each subgraph. Generate a total module file.
  • Output an init file to make it easier to work with.
  • Generate relevant configuration files, such as data parallel profile, model parallel profile.

The details are as follows:

Model File +-----------+ +-----------+ +------------+ +-----------+ +------------+ +----------+ | Edge 1 | | Edge 2 | | Edge 3 | | Edge 4 | | Edge 5 | | Edge 6 | +-----------+ +-----------+ +------------+ +-----------+ +------------+ +----------+ +-----------+ +-----------+ +------------+ +-----------+ +------------+ |Node 1 | |Node 2 | | Node 3 | |Node 4 | |Node 5 | | stage 1 | | stage 1| | stage 2| | stage 2| | stage 3| +-----+-----+ +------+----+ +--------+---+ +-+---------+ +------+-----+ | | | | | | | | | | +---------------------------------------------------------------------------------------------------+ | | | | | Subgraphs +----+ +----+ | | | | | | | | v v v v v +----+-----+-----+ +----+--------+-+ +-------+-------+ | Subgraph 1 | | Subgraph 2 | | Subgraph 3 | | | | | | | | Node 1 | | Node 3 | | Node 5 | | | | | | | | Node 2 | | Node 4 | | | | | | |  | | +-------+--------+ +---------+-----+ +-------+-------+ | | | | | | +---------------------------------------------------------------------------------------------------+ | | | Modules | | | v v v +-------+-------+ +-------+-------+ +------+--------+ | | | | | | | Module 1 | | Module 2 | | Module 3 | | | | |  | | +-------+-------+ +-------+-------+ +--------+------+ | | | | | | +----------------------------------------------------------------------------------------------------+ | | | Python files | | | v v v +-----+------+ +------+------+ +------+------+ | | | | | | | stage1.py | | stage2.py | | stage3.py | |  | | | | | +-----+------+ +------+------+ +------+------+ | | | | | | | | | +-------------------------------------------------------+ | | v +--------+--------+ | gnmt.py | | | +-----------------+Copy the code

The mobile phone is as follows:

1.2 Review

For better illustration, let’s recall the output above.

The output file is shown below (excerpt), and as you can see, the output file is still a graph, just like the profile output file. The key is to add stages to each node, and this article will convert this output file into a Pytorch model, or set of Python files.

Node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, Activation_size =6291456.0, parameter_size=132382720.000 -- stage_id=0 node5 -- EmuBidirLSTM((bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 -- stage_id=1 NODE6 -- Dropout(P =0.2) -- forward_compute_time=0.077, Backward_compute_time =0.196, activation_size=12582912.0, parameter_size=0.000 -- stage_id=1 node7 -- LSTM(2048, 1024) -- forward_compute_time=3.190, backward_compute_time=5.348, activation_size=6553600.0, Parameter_size =50364416.000 -- stage_id=2 node8 -- __getitem__(0) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=3 NODE10 -- Dropout(P =0.2) -- forward_compute_time=0.064, backward_compute_time=0.128, Activation_size =6291456.0, parameter_size=0.000 -- stage_id=3 node11 -- LSTM(1024, 1024) -- forward_compute_time=2.491, Backward_compute_time = 4.203, activation_size = 6553600.0, Parameter_size =33587200.000 -- stage_id=3 node12 -- __getitem__(0) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=3 node14 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size =6291456.0, parameter_size=0.000 -- STAGe_id =4 NODE15 -- Dropout(P =0.2) -- forward_compute_time=0.059, Backward_compute_time =0.121, activation_size=6291456.0, parameter_size=0.000 -- stage_id=4 node16 -- LSTM(1024, 1024) -- forward_compute_time=2.492, backward_compute_time=4.201, activation_size=6553600.0, Parameter_size =33587200.000 -- stage_id=4 node17 -- __getitem__(0) -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=5 node19 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, Activation_size = 6291456.0, Parameter_size =0.000 -- stage_id=5 Node1 -- 4 Node4 -- 5 Node2 -- 5 Node5 -- 6 Node6 -- 7 Node7 -- node8 node8 -- node10 node10 -- node11 node11 -- node12 node12 -- node14 node8 -- node14 node14 -- node15 node15 -- node16 node16 -- node17 node17 -- node19Copy the code

0x02 Composite model

The detailed composition model code is in optimizer/convert_graph_to_model.py.

2.1 Subject Logic

The principal logic is as follows:

  • Access to the configuration
  • Load it from the graph file and get a graph
  • The segmentation diagram is a series of subgraphs
  • Convert subgraphs into modules
  • Merge subgraphs to generate an overall model file
  • generate__init__.py
  • Generating a Configuration File

Let’s take a look at the source code and analyze it carefully later.

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="Convert profile graphs to generated model description")
    parser.add_argument('-f', "--profile_filename", required=True,
                        help="Input profile filename")
    parser.add_argument("--model_template_filename", default="templates/model.py.template",
                        help="Model template filename")
    parser.add_argument("--init_template_filename", default="templates/__init__.py.template",
                        help="__init__.py template filename")
    parser.add_argument("--conf_template_filename", default="templates/conf.json.template",
                        help="Conf template filename")
    parser.add_argument("--stage_to_num_ranks_map", type=str, default=None,
                        help="Stage split")
    parser.add_argument('-n', "--model_name", required=True,
                        help="Name of model class")
    parser.add_argument('-a', "--arch", required=True,
                        help="Human-readable architecture name")
    parser.add_argument('-o', "--output_directory", required=True,
                        help="Full path of output model directory")
    args = parser.parse_args()
​
    # mkdir output_directory.
    subprocess.check_output("mkdir -p %s" % args.output_directory, shell=True)
​
    # 从graph文件中加载,得到一个图
    input_node = graph.Node("input_node", node_desc="Input")
    full_graph = graph.Graph.from_str(open(args.profile_filename, 'r').read())
    initialize_weights = (args.arch == "vgg16" or args.arch == "resnet50")
    input_node.stage_id = 0
    sinks = full_graph.sinks()
    # Remove all unneeded sinks that are not used, makes code generation easier.
    for sink in sinks:
        if sink.node_desc.startswith("__getitem__"):
            full_graph.remove_node(sink)
            
    # 分割图为一系列子图        
    subgraphs = full_graph.partition_graph()
​
    # 把子图转换成模块
    for i, subgraph in enumerate(subgraphs):
        module_name = "Stage%d" % i
        module_filename = "stage%d.py" % i
        if len(subgraphs) == 1:
            module_name = args.model_name
            module_filename = "%s.py" % args.arch
        num_inputs, num_outputs = convert_subgraph_to_module(subgraph, full_graph, len(subgraphs),
                                                             module_name, initialize_weights,
                                                             args.model_template_filename,
                                                             os.path.join(args.output_directory,
                                                                          module_filename))
        print("Done generating %s..." % module_filename)
​
    # 合并子图,生成一个总体模型文件。   
    model = []
    import_statements = ["from .%s import %s" % (args.arch, args.model_name)]
    pytorch_modules = None
    if len(subgraphs) > 1:
        python_modules, pytorch_modules, subgraph_inputs, subgraph_outputs = \
            fuse_subgraphs_to_module(full_graph, subgraphs, args.model_name,
                                     initialize_weights,
                                     args.model_template_filename,
                                     os.path.join(args.output_directory,
                                                  "%s.py" % args.arch))
        model = ["(%s(), [%s], [%s])" % (x[0],
                                         ", ".join([""%s"" % y for y in x[1]]),
                                         ", ".join([""%s"" % y for y in x[2]]))
                 for x in zip(pytorch_modules, subgraph_inputs,
                              subgraph_outputs)]
        model.append("(criterion, ["%s"], ["loss"])" % subgraph_outputs[-1][0])
        import_statements.extend(
            ["from .%s import %s" % (python_module, pytorch_module)
             for (python_module, pytorch_module) in zip(python_modules, pytorch_modules)])
    else:
        inputs = [""input%d"" % i for i in range(num_inputs)]
        assert(num_outputs == 1)
        model.append("(%s.%s(), [%s], ["output"])" % (args.arch, args.model_name, ", ".join(inputs)))
        model.append("(criterion, ["output"], ["loss"])")
​
    # 生成__init__.py
    with open(os.path.join(args.output_directory, "__init__.py"), 'w') as f1, \
         open(args.init_template_filename, 'r') as f2:
        template = f2.read()
        init = template % {
            "arch": args.arch,
            "import_statements": "\n".join(import_statements),
            "model": ",\n        ".join(model),
            "full_model": "%s()" % args.model_name
        }
        f1.write(init)
​
    # 生成配置文件    
    if args.stage_to_num_ranks_map is not None:
        stage_to_num_ranks_map = args.stage_to_num_ranks_map.split(",")
        stage_to_num_ranks_map = [(int(x.split(":")[0]), int(x.split(":")[1]))
                      for x in stage_to_num_ranks_map]
        num_stages = 0
        for (stage_id, replication_factor) in stage_to_num_ranks_map:
            num_stages += replication_factor
        assert(len(stage_to_num_ranks_map) == len(pytorch_modules))
        num_modules = len(pytorch_modules) + 1  # Add 1 for criterion.
    elif pytorch_modules is None:
        num_stages = 1
        num_modules = 2  # Add 1 for criterion.
    else:
        num_stages = len(pytorch_modules)
        num_modules = len(pytorch_modules) + 1  # Add 1 for criterion.
    all_template_args = []
    all_template_args.append({
        "module_to_stage_map": [0] * num_modules,
        "stage_to_rank_map": str({"0": list(range(num_stages))}).replace("'", """)
    })
    all_template_args.append({
        "module_to_stage_map": list(range(num_modules-1)) + [num_modules-2],
        "stage_to_rank_map": str({str(i): [i] for i in range(num_modules-1)}).replace("'", """)
    })
    if args.stage_to_num_ranks_map is not None:
        stage_to_rank_map = {}
        ranks_so_far = 0
        for i in range(num_modules-1):
            stage_to_rank_map[str(i)] = list(range(ranks_so_far,
                                                   ranks_so_far + stage_to_num_ranks_map[i][1]))
            ranks_so_far += stage_to_num_ranks_map[i][1]
        stage_to_rank_map = str(stage_to_rank_map).replace("'", """)
        all_template_args.append({
            "module_to_stage_map": list(range(num_modules-1)) + [num_modules-2],
            "stage_to_rank_map": stage_to_rank_map
        })
    for conf_filename, template_args in zip(
        ["dp_conf.json", "mp_conf.json", "hybrid_conf.json"], all_template_args):
        with open(os.path.join(args.output_directory, conf_filename), 'w') as f1, \
             open(args.conf_template_filename, 'r') as f2:
            template = f2.read()
            conf = template % template_args
            f1.write(conf)
Copy the code

2.2 Supporting Logic

First let’s look at two arrays.

  • Declaration_whitelist is a whitelist, and if a node is in this whitelist, there is no need to process it in the init function.
  • The declaration_specialCase array includes special definitions that make special transformations if a node needs special definitions, such as import, layer definitions, and so on.
declaration_whitelist = [
    "hidden",
    "__getitem__",
    "Add",
    "Mul",
    "Concat",
    "Input",
    "Size",
    "View",
    "Transpose",
    "self.get_seq_lens"
]
​
declaration_specialcase = [
    "EmuBidirLSTM",
    "RecurrentAttention",
    "Classifier",
    "MaskConv",
    "ResizeInput",
    "InferenceBatchSoftmax",
    "BatchRNN",
    "SequenceWise"
]
Copy the code

Next, let’s look at the get_input_names method.

The get_input_names function iterates through the nodes of the graph to find the input to the subgraph.

def get_input_names(graph, full_graph, check_stages=True): # Figure out the inputs to this sub-graph, which are the predecessors of # nodes in the sub-graph not in the sub-graph. # input_names is a dict mapping each predecessor's node_id to assigned # variable name. nodes = graph.nodes input_names = {} counter = 0 for node_id in nodes: if (node_id in full_graph.in_edges and len(full_graph.in_edges[node_id]) > 0): for in_node in full_graph.in_edges[node_id]: if in_node.stage_id ! = nodes[node_id].stage_id and check_stages: # Skip hidden inputs. if full_graph.nodes[in_node.node_id].node_desc.startswith("hidden"): continue input_names[in_node.node_id] = "input%d" % counter counter += 1 else: if graph.nodes[node_id].node_desc.startswith("Input"): input_names[node_id] = "input%d" % counter counter += 1 return input_namesCopy the code

0x03 Model Transformation

Let’s look at concrete model transformations next.

3.1 Separate subgraph

First, the main function needs to separate subgraphs by stages.

Since the layers of the model have been assigned to each Stage in the previous article, this Stage uses partition_graph to separate the layers contained in each Stage.

input_node = graph.Node("input_node", node_desc="Input") full_graph = graph.Graph.from_str(open(args.profile_filename, 'r').read()) initialize_weights = (args.arch == "vgg16" or args.arch == "resnet50") input_node.stage_id = 0 sinks = full_graph.sinks() # Remove all unneeded sinks that are not used, Sinks for sink in sinks sinks in sinks # Make code generation easier. # Get rid of sinks for sink in sinks: if sink.node_desc. Startswith ("__getitem__"): full_graph.remove_node(sink) subgraphs = full_graph.partition_graph()Copy the code

The specific logic of the code corresponding to partition_graph is:

  • Walk through the nodes to find all the stages.
  • After obtaining all stage ids, the subgraph is constructed according to the stage ID. Specifically, for a given stage, the node corresponding to the stage is found in all nodes and a subgraph is constructed.
Def partition_graph(self): stage_ids = set() # find all stages for node_id in self.nodes: Stage_ids. add(self.nodes[node_id].stage_id) # stage_ids = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} if len(stage_ids) == 1: Return [self.copy()] subgraphs = [] # Stage_id in stage_ids: Subgraphs. Append (sell.partition_graph_helper (stage_id)) return SubGraphs # Def partition_graph_helper(self, stage_id): subgraph = Graph() for node1_id in self.nodes: if self.nodes[node1_id].stage_id == stage_id: subgraph.add_node(self.nodes[node1_id]) if node1_id not in self.edges: continue for node2 in self.edges[node1_id]: if node2.stage_id == stage_id: subgraph.add_edge(self.nodes[node1_id], node2) return subgraphCopy the code

Get the subgraph:

subgraphs = {list: 8} 00 = {Graph} 'node4' = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, Backward_compute_time = 6.949, activation_size = 6291456.0, Parameter_size =132382720.000 -- stage_id=0 'node1' = {Node} node1 -- Input0 -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 0.0, Parameter_size =0.000 -- stage_id=0 'node2' = {Node} node2 -- Input1 -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 0.0, Parameter_size =0.000 -- stage_id=0 'node3' = {Node} node3 -- Input2 -- forward_compute_time=0.000, Backward_compute_time = 0.000, activation_size = 0.0, Parameter_size =0.000 -- stage_id=0 __len__ = {int} 4 01 = {Graph} node5 edges = {dict: 1} {'node5': [<graph.graph.Node object at 0x7f9c5be91438>]} in_edges = {dict: 1} {'node6': [<graph.graph.Node object at 0x7f9c5be91470>]} nodes = {dict: 2} {'node5': <graph.graph.Node object at 0x7f9c5be91470>, 'node6': <graph.graph.Node object at 0x7f9c5be91438>} 'node5' = {Node} node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 -- stage_id=1 'node6' = {Node} node6 -- Dropout(P =0.2) -- forward_compute_time=0.077, Backward_compute_time =0.196, activation_size=12582912.0, parameter_size=0.000 -- stage_id=1 __len__ = {int} 2......Copy the code

3.2 Transformation Model

In the main function, for each subgraph, convert it to a Pytorch Module, which corresponds to a Python file.

That is, each layer is a submodule of this Module.

for i, subgraph in enumerate(subgraphs): Module_name = "Stage%d" % I module_filename = "Stage% d.py" % I if len(subGraphs) == 1: Module_name = args.model_name module_filename = "%s.py" % args.arch # Convert this subdiagram to a module num_inputs, num_outputs = convert_subgraph_to_module(subgraph, full_graph, len(subgraphs), module_name, initialize_weights, args.model_template_filename, os.path.join(args.output_directory, module_filename)) print("Done generating %s..." % module_filename)Copy the code

The conversion model logic is as follows: If the input is a graph containing nodes, convert_subgraph_to_module converts the graph to a module.

graph = {Graph} edges = {dict: 1} in_edges = {dict: 1} marked_nodes = {set: 2} nodes = {dict: 2} 'node5' = {Node} node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, Parameter_size =67174400.000 -- stage_id=1 'node6' = {Node} node6 -- Dropout(P =0.2) -- forward_compute_time=0.077, Backward_compute_time =0.196, activation_size=12582912.0, parameter_size=0.000 -- STAGe_id =1 __len__ = {int} 2Copy the code

Let’s go through them one by one.

3.2.1 conversion Module

The conversion Module logic is as follows:

  • The get_input_names function traverses the graph to find the input to the subgraph.

  • If the node is in the input, the forward function definition section is built in preparation for subsequent code generation. [‘out0 = input0.clone()’, ‘out1 = input1.clone()’].

  • Iterate over the nodes in the graph and do the following, basically generating various Python statements based on the node properties:

    • Get information about each layer, such as its name, output, and whether it is an inplace operation.
    • If a node requires a special definition, perform a special transformation, such as import, layer definition, and so on
    • Merge import statements
    • If the node description is not in the declared whitelist, record that a build statement will be generated for those nodes later when the init method is generated.
    • I get the node entry edge
    • Construct python statements directly if the node is in a built-in operator.
    • If it’s not built in, it’s set directly, like'out2 = self.layer2(out0, out1)'.
  • Ensure that the module output is in the order of the original model.

  • If you need to initialize weights, do so.

  • To apply the template file generation model, you populate the template file with the various Python statements generated earlier.

  • Write to the model Python file.

In the following code comments, some runtime variables are printed.

def convert_subgraph_to_module(graph, full_graph, num_subgraphs, module_name, initialize_weights,
                               model_template_filename, output_filename):
    model_template = open(model_template_filename, 'r').read()
    nodes = graph.topological_sort()
    import_statements = []
    module_methods = []
​
    counter = 0
    layer_names = {}
    layer_names_and_declarations = []
    function_definition = []
    # get_input_names 函数遍历graph,找到这个子图的输入
    input_names = get_input_names(graph, full_graph)
    num_inputs = len(input_names)
    output_names = input_names.copy()
    sources = graph.sources()
​
    # Now, generate expressions for each node.
    # Iterate through nodes in topological order, and add output_name mappings for
    # each expression. Use this output_name mapping when generating expressions
    # in the model's implementation file.
    # TODO: Make sure that nodes with multiple inputs have the inputs in the
    # right order (even though this probably does not matter in practice).
    
    # 构建forward函数定义部分,为后续生成代码做准备
    for node_id in input_names:
        output_name = "out%d" % counter
        function_definition.append("%s = %s.clone()" % (output_name,
                                                        input_names[node_id]))
        output_names[node_id] = output_name
        counter += 1
    # 得到 function_definition 为 ['out0 = input0.clone()', 'out1 = input1.clone()']
        
    # 遍历图中的节点  
    for node in nodes:
        # 层相关信息
        layer_call = None
        layer_name = "self.layer%d" % counter
        output_name = "out%d" % counter
        layer_declaration = "torch.nn.%s" % (
            node.node_desc.replace("inplace", "inplace=True"))
        layer_names[node.node_id] = layer_name
        if node.node_id not in output_names:
            output_names[node.node_id] = output_name
​
        # Skip layers that don't need a declaration (example: '+=').
        for declaration in declaration_specialcase:
            # 如果某节点是需要特殊定义的,就进行特殊转换,比如import,层定义等等
            if node.node_desc.startswith(declaration):
                found = True
                if declaration == "EmuBidirLSTM":
                    m = re.search(r'.*LSTM((\d+), (\d+)).*', node.node_desc)
                    input_size = int(m.group(1))
                    hidden_size = int(m.group(2))
                    layer_declaration = "EmuBidirLSTM(%d, %d)" % (input_size, hidden_size)
                    import_statements.append("from seq2seq.models.encoder import EmuBidirLSTM")
                    # 这里得到 import_statements 为 ['from seq2seq.models.encoder import EmuBidirLSTM'],layer_declaration 为 'EmuBidirLSTM(1024, 1024)'
                    
                elif declaration == "RecurrentAttention":
                    m = re.search(r'.*LSTM((\d+), (\d+)).*', node.node_desc)
                    input_size = int(m.group(1))
                    hidden_size = int(m.group(2))
                    m = re.search(r'.*in_features=(\d+), out_features=(\d+).*', node.node_desc)
                    context_size = int(m.group(1))
                    layer_declaration = "RecurrentAttention(%d, %d, %d)" % (input_size, hidden_size, context_size)
                    import_statements.append("from seq2seq.models.decoder import RecurrentAttention")
​
                # 省略部分代码    
                    
                elif declaration == "InferenceBatchSoftmax":
                    layer_declaration = "InferenceBatchSoftmax()"
                    import_statements.append("from model import InferenceBatchSoftmax")
                break
​
        # 归并import语句         
        import_statements = list(set(import_statements))
        # 如果节点描述不在声明白名单之中,则处理
        found = False
        for declaration in declaration_whitelist:
            if node.node_desc.startswith(declaration):
               found = True
        if not found:
            layer_names_and_declarations.append((layer_name, layer_declaration))
​
        # 得到节点入边    
        if node.node_id in full_graph.in_edges:
            in_edges = full_graph.in_edges[node.node_id]
        else:
            in_edges = []
        if len(in_edges) == 0 and node.node_desc.startswith("Input"):
            pass  # Don't need to do anything for this case.
        else:
            # 看看节点是否在内置运算符之中
            # node_desc 为 'EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024))'
            
            if node.node_desc.startswith("Size"):
                assert(len(in_edges) == 1)
                m = re.search(r'Size((-?\d+))', node.node_desc)
                idx = int(m.group(1))
                layer_call = "%s = %s.size(%d)" % (output_name,
                                                   output_names[in_edges[0].node_id],
                                                   idx)
​
            elif node.node_desc.startswith("Add"):
                assert(len(in_edges) == 2)
                node1 = in_edges[0]
                node2 = in_edges[1]
                if len(full_graph.edges[node1.node_id]) > 1:
                    tmp = node1
                    node1 = node2
                    node2 = tmp
                layer_call = "%s = %s + %s" % (output_names[node1.node_id],
                                               output_names[node1.node_id],
                                               output_names[node2.node_id])
                output_names[node.node_id] = output_names[node1.node_id]
                        
            # 省略部分代码
            
            elif node.node_desc.startswith("hidden"):
                pass
            elif node.node_desc == "self.get_seq_lens":
                assert(len(in_edges) == 1)
                in_node = in_edges[0]
                layer_call = "%s = %s(%s)" % (output_name, node.node_desc, output_names[in_node.node_id])
            else:
                # 如果不是内置运算,就直接设置,这里为 'out2 = self.layer2(out0, out1)'
                layer_call = "%s = %s(%s)" % (output_name, layer_name,
                                              ", ".join([output_names[in_node.node_id]
                                                         for in_node in in_edges]))
        if layer_call is not None:
            function_definition.append(layer_call)
        counter += 1
​
    # Ensure that outputs of a module are returned in the same order as
    # the original model implementation.
    # TODO: This might not work as intended for sub-graphs
    # 确保模块输出是按照原始模型的顺序输出
    full_graph.populate_depths()
    graph_output_names, _ = get_output_names(graph, full_graph, 0)
    for key in graph_output_names:
        graph_output_names[key] = output_names[key]
    output_names_list = get_tensor_names_list(graph_output_names)
    num_outputs = len(output_names_list)
    function_definition.append("return %s" %
        get_output_tuple_str(output_names_list))
        # function_definition 是 ['out0 = input0.clone()', 'out1 = input1.clone()', 'out2 = self.layer2(out0, out1)', 'out3 = self.layer3(out2)', 'return out3']
    
    
    # Layer declarations are added to the constructor of the module.
    # Function definitions are added to the `forward()' method of the
    # module.
    layer_declarations_str = "\n        ".join([
        "%s = %s" % (x[0], x[1]) for x in layer_names_and_declarations])
   
    # 如果需要初始化权重,则做处理
    if initialize_weights:
        layer_declarations_str += "\n        self._initialize_weights()"
        module_methods.append("""def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, torch.nn.Conv2d):
                torch.nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    torch.nn.init.constant_(m.bias, 0)
            elif isinstance(m, torch.nn.BatchNorm2d):
                torch.nn.init.constant_(m.weight, 1)
                torch.nn.init.constant_(m.bias, 0)
            elif isinstance(m, torch.nn.Linear):
                torch.nn.init.normal_(m.weight, 0, 0.01)
                torch.nn.init.constant_(m.bias, 0)""")
    function_definition_str = "\n        ".join(function_definition)
    
    # function_definition_str 为 "\n        ".join(function_definition) ['out0 = input0.clone()', 'out1 = input1.clone()', 'out2 = self.layer2(out0, out1)', 'out3 = self.layer3(out2)', 'return out3']
    input_names_list = get_tensor_names_list(input_names)
    input_names = ", ".join(input_names_list
​
    # input_names 为 'input1, input0'                        
    # 应用模版文件生成模型
    model = model_template % {"layer_declarations": layer_declarations_str,
                              "function_definition": function_definition_str,
                              "module_name": module_name,
                              "inputs": input_names,
                              "import_statements": "\n".join(import_statements),
                              "module_methods": "\n\n".join(module_methods)}
    # 写入模型python文件
    with open(output_filename, 'w') as f:
        f.write(model)
    return num_inputs, num_outputs
​
Copy the code

3.2.2 Template File

The above code, is based on the application of template file generation model, template file located in: optimizer/templates/model. The py. The template, the content is as follows, it is the use of transfer python statements generated in the course of filling the template files.

import torch
%(import_statements)s
​
class %(module_name)s(torch.nn.Module):
    def __init__(self):
        super(%(module_name)s, self).__init__()
        %(layer_declarations)s
​
    %(module_methods)s
​
    def forward(self, %(inputs)s):
        %(function_definition)s
Copy the code

3.2.3 Generating files

In the previous code, the following statement generates several model files, one python file for each Subgraph.

Outputs num_inputs with open(output_filename, 'w') as f: f.write(model) return num_inputs, num_inputsCopy the code

The generated model files are listed as follows:

import torch
​
class Stage0(torch.nn.Module):
    def __init__(self):
        super(Stage0, self).__init__()
        self.layer6 = torch.nn.Embedding(32320, 1024, padding_idx=0)
​
    def forward(self, input0, input1, input2):
        out0 = input0.clone()
        out1 = input1.clone()
        out2 = input2.clone()
        out6 = self.layer6(out0)
        return (out1, out2, out6)
Copy the code

Such as:

import torch from seq2seq.models.encoder import EmuBidirLSTM class Stage1(torch.nn.Module): def __init__(self): super(Stage1, self).__init__() self.layer2 = EmuBidirLSTM(1024, Dropout(p=0.2) def forward(self, input1, input0): out0 = input0.clone() out1 = input1.clone() out2 = self.layer2(out0, out1) out3 = self.layer3(out2) return out3Copy the code

3.3 Fusion Model

In the previous section, we generated several Python files corresponding to a subgraph for each module. The purpose of this section is to merge these subgraphs into a large graph. Corresponding to python code, we generate a new Python file which imports the Python of each subgraph. Generate a total module file.

3.3.1 Main function logic

The logic of main is:

  • Import parameter configuration, get something like [‘ from.gnmt import pd’].
  • Merge subdiagrams into an overall module.
  • Expand the model according to the fused results.
  • Expand import statements based on the fused results.
Model = [] # ['from. GNMT import pd'] import_statements = ["from.%s import %s" % (args.arch, args.model_name)] pytorch_modules = None if len(subgraphs) > 1: Module PYTHon_modules, Pytorch_modules, Subgraph_inputs, subgraph_outputs = \ fuse_subgraphs_to_module(full_graph, subgraphs, args.model_name, initialize_weights, args.model_template_filename, os.path.join(args.output_directory, Supachai panitchpakdi y "" % % args. The arch) #) on the basis of good fusion results extend the model model = [" (% s (), [% s], [% s])" % (x [0], ", "join ([" "" "% s % y for y in the x [1]]). ", ".join([""%s"" % y for y in x[2]])) for x in zip(pytorch_modules, subgraph_inputs, subgraph_outputs)] model.append("(criterion, ["%s"], ["loss"])" % subgraph_outputs[-1][0]) # extend import statement import_statements. Extend (["from.%s import %s" % (python_module, pytorch_module) for (python_module, pytorch_module) in zip(python_modules, pytorch_modules)]) else: inputs = [""input%d"" % i for i in range(num_inputs)] assert(num_outputs == 1) model.append("(%s.%s(), [%s], ["output"])" % (args.arch, args.model_name, ", ".join(inputs))) model.append("(criterion, ["output"], ["loss"])")Copy the code

3.3.2 Fusion model

Fuse_subgraphs_to_module generates a gnmt.py file with the following logic:

  • Load the template.
  • Merge module name.
  • Handle function definitions and layer definitions.
  • Iterate over the subgraph, building the outputs and inputs.
  • Add output information.
  • Add import information.
  • Apply template files.
  • Output file.

The source code is as follows:

def fuse_subgraphs_to_module(graph, subgraphs, model_name, initialize_weights, model_template_filename, output_filename): Open (model_template_filename, 'r').read() # PyTorch modules are the names given to the generated stages (which are # of type torch.nn.Module). # Python modules are the names given to the filenames containing these # generated torch. Nn.Modules pytorch_modules = [] python_modules = [] for i in range(len(subgraphs)): pytorch_modules.append("Stage%d" % i) python_modules.append("stage%d" % i) # python_modules = {list: 10} ['stage0', 'stage1', 'stage2', 'stage3', 'stage4', 'stage5', 'stage6', 'stage7', 'stage8', 'stage9'] # pytorch_modules = {list: 10} ['Stage0', 'Stage1', 'Stage2', 'Stage3', 'Stage4', 'Stage5', 'Stage6', 'Stage7', 'Stage8', Layer_declarations = [] function_definition = [] for I, pytorch_module in enumerate(pytorch_modules): layer_declarations.append("self.stage%d = %s()" % ( i, pytorch_module)) if initialize_weights: layer_declarations.append("self._initialize_weights()") # function_definition = {list: 0} [] # layer_declarations = {list: 10} ['self.stage0 = Stage0()', 'self.stage1 = Stage1()', 'self.stage2 = Stage2()', 'self.stage3 = Stage3()', 'self.stage4 = Stage4()', 'self.stage5 = Stage5()', 'self.stage6 = Stage6()', 'self.stage7 = Stage7()', 'self.stage8 = Stage8()', 'self.stage9 = Stage9 output_counter = 0 output_names = {} graph_input_names = get_input_names(graph, graph, check_stages=False) for key in graph_input_names: Outputs = elsif outputs = elsiF outputs = elsiF outputs = elsiF outputs = elsiF outputs = elsiF outputs Construct output and input for I, subgraph in enumerate(subgraphs): subgraph_input_names = get_input_names(subgraph, graph) subgraph_output_names, output_counter = get_output_names( subgraph, graph, output_counter) for key in subgraph_input_names: subgraph_input_names[key] = output_names[key] for key in subgraph_output_names: output_names[key] = subgraph_output_names[key] function_definition.append("%s = self.stage%d(%s)" % ( get_output_tuple_str(get_tensor_names_list(subgraph_output_names)), i, ", ".join(get_tensor_names_list(subgraph_input_names)))) subgraph_inputs.append(get_tensor_names_list(subgraph_input_names)) subgraph_outputs.append(get_tensor_names_list(subgraph_output_names)) #subgraph_input_names = {dict: 1} {'node47': 'out20'} #subgraph_inputs = {list: 10} [['input0', 'input1', 'input2'], ['out2', 'out0'], ['out4'], ['out5'], ['out7', 'out6'], ['out8', 'out9', 'out2', 'out3'], ['out10', 'out12'], ['out14', 'out15', 'out16', 'out11'], ['out14', 'out17', 'out18', 'out19'], ['out20']] #subgraph_output_names = {dict: 1} {'node48': 'out21'} #subgraph_outputs = {list: 10} [['out2', 'out3', 'out0'], ['out4'], ['out5'], ['out7', 'out6'], ['out8', 'out9'], ['out10', 'out12', 'out11'], ['out14', 'out15', 'out16'], ['out17', 'out18', 'out19'], ['out20'], ['out21']] # add output function_define.append ("return %s" % get_output_tuple_str(get_tensor_names_list(subgraph_output_names))) function_definition_str = "\n Import_statements = ["from.%s import %s" % (python_module, pytorch_module) for (python_module, pytorch_module) in zip(python_modules, pytorch_modules)] input_names = get_input_names(graph, graph, check_stages=False) input_names = ", ". Join (get_tensor_names_list(input_names)) # apply template file model = model_template % {"layer_declarations": "\n ".join(layer_declarations), "function_definition": function_definition_str, "module_name": model_name, "inputs": input_names, "import_statements": "\n".join(import_statements), "module_methods": ""} # TODO: Figure out if we need to pass in other module_methods here? Print ("Done with sub-graph fusion...") # open(output_filename, 'w') as f: f.write(model) return python_modules, pytorch_modules, subgraph_inputs, subgraph_outputsCopy the code

3.3.3 output

The final fusion results are as follows:

import torch from .stage0 import Stage0 from .stage1 import Stage1 from .stage2 import Stage2 from .stage3 import Stage3  from .stage4 import Stage4 from .stage5 import Stage5 from .stage6 import Stage6 from .stage7 import Stage7 from .stage8 import Stage8 from .stage9 import Stage9 class pd(torch.nn.Module): def __init__(self): super(pd, self).__init__() self.stage0 = Stage0() self.stage1 = Stage1() self.stage2 = Stage2() self.stage3 = Stage3() self.stage4  = Stage4() self.stage5 = Stage5() self.stage6 = Stage6() self.stage7 = Stage7() self.stage8 = Stage8() self.stage9 = Stage9() def forward(self, input0, input1, input2): (out2, out3, out0) = self.stage0(input0, input1, input2) out4 = self.stage1(out2, out0) out5 = self.stage2(out4) (out7, out6) = self.stage3(out5) (out8, out9) = self.stage4(out7, out6) (out10, out12, out11) = self.stage5(out8, out9, out2, out3) (out14, out15, out16) = self.stage6(out10, out12) (out17, out18, out19) = self.stage7(out14, out15, out16, out11) out20 = self.stage8(out14, out17, out18, out19) out21 = self.stage9(out20) return out21Copy the code

3.4 the init file

For ease of use, an __init__ file is generated.

Import_statements, model, etc.

The variables are as follows:

model = {list: 11} 
 00 = {str} '(Stage0(), ["input0", "input1", "input2"], ["out2", "out3", "out0"])'
 01 = {str} '(Stage1(), ["out2", "out0"], ["out4"])'
 02 = {str} '(Stage2(), ["out4"], ["out5"])'
 03 = {str} '(Stage3(), ["out5"], ["out7", "out6"])'
 04 = {str} '(Stage4(), ["out7", "out6"], ["out8", "out9"])'
 05 = {str} '(Stage5(), ["out8", "out9", "out2", "out3"], ["out10", "out12", "out11"])'
 06 = {str} '(Stage6(), ["out10", "out12"], ["out14", "out15", "out16"])'
 07 = {str} '(Stage7(), ["out14", "out15", "out16", "out11"], ["out17", "out18", "out19"])'
 08 = {str} '(Stage8(), ["out14", "out17", "out18", "out19"], ["out20"])'
 09 = {str} '(Stage9(), ["out20"], ["out21"])'
 10 = {str} '(criterion, ["out21"], ["loss"])'
 __len__ = {int} 11
 
 
 import_statements = {list: 1} ['from .gnmt import pd']
 0 = {str} 'from .gnmt import pd'
Copy the code

The code is as follows:

    with open(os.path.join(args.output_directory, "__init__.py"), 'w') as f1, \
         open(args.init_template_filename, 'r') as f2:
        template = f2.read()
        init = template % {
            "arch": args.arch,
            "import_statements": "\n".join(import_statements),
            "model": ",\n        ".join(model),
            "full_model": "%s()" % args.model_name
        }
        f1.write(init)
Copy the code

The resulting __init__ file looks like this:

from .gnmt import pd
from .stage0 import Stage0
from .stage1 import Stage1
from .stage2 import Stage2
from .stage3 import Stage3
from .stage4 import Stage4
from .stage5 import Stage5
from .stage6 import Stage6
from .stage7 import Stage7
from .stage8 import Stage8
from .stage9 import Stage9
​
def arch():
    return "gnmt"
​
def model(criterion):
    return [
        (Stage0(), ["input0", "input1", "input2"], ["out2", "out3", "out0"]),
        (Stage1(), ["out2", "out0"], ["out4"]),
        (Stage2(), ["out4"], ["out5"]),
        (Stage3(), ["out5"], ["out7", "out6"]),
        (Stage4(), ["out7", "out6"], ["out8", "out9"]),
        (Stage5(), ["out8", "out9", "out2", "out3"], ["out10", "out12", "out11"]),
        (Stage6(), ["out10", "out12"], ["out14", "out15", "out16"]),
        (Stage7(), ["out14", "out15", "out16", "out11"], ["out17", "out18", "out19"]),
        (Stage8(), ["out14", "out17", "out18", "out19"], ["out20"]),
        (Stage9(), ["out20"], ["out21"]),
        (criterion, ["out21"], ["loss"])
    ]
​
def full_model():
    return pd()
​
Copy the code

3.5 Configuration File

The configuration file is then generated for subsequent program execution. Json “, “mp_conf.json”, and “hybrid_conf.json” may be generated.

Which module is configured on which stage, and which stage is configured on which rank.

3.5.1 Code logic

The main logic is:

  • If the program input has already set how to configure stage on top of rank, do so.
  • Pytorch_modules sets the number of stages and modules.
  • Set specific rank, stage, and module assignments.
  • Write to the configuration file.

Where pyTorch_modules is the result returned by fuse_Subgraphs_to_module.

pytorch_modules = {list: 10} ['Stage0', 'Stage1', 'Stage2', 'Stage3', 'Stage4', 'Stage5', 'Stage6', 'Stage7', 'Stage8', 'Stage9']
Copy the code

The code is as follows:

if args.stage_to_num_ranks_map is not None: # if the input of the program has already set how to configure stage on rank, Stage_to_num_ranks_map = args.stage_to_num_ranks_map.split(",") stage_to_num_ranks_map = [(int(x.split(":")[0]), int(x.split(":")[1])) for x in stage_to_num_ranks_map] num_stages = 0 for (stage_id, replication_factor) in stage_to_num_ranks_map: num_stages += replication_factor assert(len(stage_to_num_ranks_map) == len(pytorch_modules)) num_modules = len(pytorch_modules) + 1 # Add 1 for criterion. elif pytorch_modules is None: Num_stages = 1 num_modules = 2 # Add 1 for criterion. Else: Num_stages = len(pytorch_modules) num_modules = len(Pytorch_modules) + 1 # Add 1 for criterion. All_template_args = [] # Set data parallelism all_template_args. Append ({"module_to_stage_map": [0] * num_modules, "stage_to_rank_map": str({"0": List (range(num_stages))}).replace("'", "")}) # set the model configuration all_template_args.append({"module_to_stage_map": list(range(num_modules-1)) + [num_modules-2], "stage_to_rank_map": str({str(i): [I] for I in range(num_modules-1)}).replace(" ", "")}) # 2} 'module_to_stage_map' = {list: 11} [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 'stage_to_rank_map' = {str} '{"0": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}' 1 = {dict: 2} 'module_to_stage_map' = {list: 11} [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9] 'stage_to_rank_map' = {str} '{"0": [0], "1": [1], "2": [2], "3": [3], "4": [4], "5" : [5], "6" : [6], "7" : [7], "eight" : [8], "9" : [9]} """ # if args. Stage_to_num_ranks_map is not None: stage_to_rank_map = {} ranks_so_far = 0 for i in range(num_modules-1): stage_to_rank_map[str(i)] = list(range(ranks_so_far, ranks_so_far + stage_to_num_ranks_map[i][1])) ranks_so_far += stage_to_num_ranks_map[i][1] stage_to_rank_map = str(stage_to_rank_map).replace("'", """) all_template_args.append({ "module_to_stage_map": list(range(num_modules-1)) + [num_modules-2], "stage_to_rank_map": Stage_to_rank_map}) # Write configuration file for conf_filename, template_args in zip(["dp_conf.json", "mp_conf. "hybrid_conf.json"], all_template_args): with open(os.path.join(args.output_directory, conf_filename), 'w') as f1, \ open(args.conf_template_filename, 'r') as f2: template = f2.read() conf = template % template_args f1.write(conf)Copy the code

3.5.2 Data Parallelism

Dp_config. json is a configuration file specifically generated for data in parallel, as shown in the following example.

{
    "module_to_stage_map": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    "stage_to_rank_map": {"0": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}
}
Copy the code

3.5.3 Model parallelism

Mp_config. json is a configuration file specifically generated in parallel for the model, as shown in the following example.

{ "module_to_stage_map": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9], "stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3], "4" : [4], "5" : [5], "6" : [6], "7" : [7], "eight" : [8], "9" : [9]}}Copy the code

0 x04 summary

The final result is as follows: the model diagram is converted into its corresponding Python file for each stage, and then summarized and packaged into a total Python file for users to use directly.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Deep learning pipeline parallel PipeDream(1)– Profile stage

Deep learning pipeline parallel PipeDream(2)– computing partitions