Introduction to linear Workflow

The next task in a linear workflow must wait for the previous task to complete before the next task can be executed

Let’s take a look at an actual demo of linear_flow provided by openstack

The flow is used to create a volume.

The official comment is written clearly, resulting in a workflow that performs the following six steps.

Api_flow = linear_flow.Flow(flow_name) = linear_flow.Flow(flow_name

Api_flow.add (QuotaReserveTask(), EntryCreateTask(DB_API), QuotaCommitTask()) adds tasks to the empty list in order to be executed.

What’s even better is that in the previous class, we started each asynchronous task in a loop. But api_flow has its own run method

The flow_engine in run is derived from the get_flow method, which ensures that tasks in the linear_flow are “started asynchronously and executed in an orderly fashion.”

We need to understand “start asynchronously and ensure orderly execution”. If we just wanted orderly execution, we would just be F1 (); f2(); f3(); But F1, F2, are probably not computationally intensive operations in and of themselves, they are no more than CPU blocking. So we want to “start asynchronously and ensure that it’s executed in an orderly fashion.”

with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
  flow_engine.run()
  vref = flow_engine.storage.fetch('volume')
  LOG.info(_LI("Volume created successfully."), resource=vref)
  return vref
Copy the code

get_flow

def get_flow(db_api, image_service_api, availability_zones, create_what,
             scheduler_rpcapi=None, volume_rpcapi=None) :
    """Constructs and returns the api entrypoint flow. This flow will do the following: 1. Inject keys & values for dependent tasks. 2. Extracts and validates the input keys & values. 3. Reserves the quota (reverts quota on any failures). 4. Creates the database entry. 5. Commits the quota. 6. Casts to volume manager or scheduler for further processing. """

    # ACTION = 'volume:create'
    flow_name = ACTION.replace(":"."_") + "_api"
    api_flow = linear_flow.Flow(flow_name)

    api_flow.add(ExtractVolumeRequestTask(
        image_service_api,
        availability_zones,
        rebind={'size': 'raw_size'.'availability_zone': 'raw_availability_zone'.'volume_type': 'raw_volume_type'}))
    api_flow.add(QuotaReserveTask(),
                 EntryCreateTask(db_api),
                 QuotaCommitTask())

    if scheduler_rpcapi and volume_rpcapi:
        # This will cast it out to either the scheduler or volume manager via
        # the rpc apis provided.
        api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))

    # Now load (but do not run) the flow using the provided initial data.
    return taskflow.engines.load(api_flow, store=create_what)
Copy the code