This is the third article in a series called Asynchronous processing in Android and iOS Development. In this article, we’ll focus on some of the problems you might encounter when performing multiple asynchronous tasks.

Often we need to perform multiple asynchronous tasks that work together to fulfill requirements. Based on typical application scenarios, this article explains three kinds of collaboration relationships of asynchronous tasks:

  • Sequential execution
  • Execute concurrently, result merge
  • Concurrent execution, one party takes precedence

This paper takes three application scenarios as examples to discuss the above three collaboration relationships. The three application scenarios are as follows:

  • Multistage cache
  • Concurrent network request
  • Page caching

Finally, this article will try to give a case of using a framework such as RxJava to implement “concurrent network requests” and discuss it.

Note: The code that appears in this series has been collated to GitHub (which is constantly updated), and the repository address is:

  • Github.com/tielei/Asyn…

Among them, the Java code in this article, the com. Zhangtielei. Demos. Async. Programming. The multitask in this package.


Multiple asynchronous tasks are executed successively

Sequential execution indicates that an asynchronous task is started first, and the next asynchronous task is started after the callback occurs. This is the easiest way for multiple asynchronous tasks to collaborate.

A typical example is multi-level caching of static resources, of which the most popular example is the multi-level caching of static images. When loading a static image from the client, there are at least two levels of Cache: the first level of Memory Cache and the second level of Disk Cache. The loading process is as follows:

  1. If a match is found in the Memory Cache, the result is returned. Otherwise, go to the next step
  2. If a Disk Cache is hit, the Disk Cache is returned. Otherwise, go to the next step
  3. Initiate network requests to download and decode image files.

Typically, the first step of finding the Memory Cache is a synchronization task. Steps 2 and 3 are asynchronous tasks, for the same image loading tasks, between these two steps is “successively succeeded to perform” relationship: “find the Disk Cache” asynchronous task is completed (the callback), according to the result of a Cache hit again decided to don’t start to “start a network request” asynchronous tasks.

The following code shows the start and execution of two asynchronous tasks, “find Disk Cache” and “make network request”.

First, we need to define the interfaces for the asynchronous tasks “Disk Cache” and “network request”.

public interface ImageDiskCache {
    /** * Asynchronously obtain the cached Bitmap object@param key
     * @paramCallback is used to return the cached Bitmap object */
    void getImage(String key, AsyncCallback<Bitmap> callback);
    /** * Saves the Bitmap object to the cache@param key
     * @paramBitmap Indicates the bitmap object * to save@paramCallback returns success or failure as a result of the current save operation. */
    void putImage(String key, Bitmap bitmap, AsyncCallback<Boolean> callback);
}Copy the code

The ImageDiskCache interface is the DiskCache used to access images. AsyncCallback is a generic asynchronous callback interface. It is defined as follows (and will be used later in this article) :

/** * A generic callback interface definition. Used to return a parameter. *@param<D> Parameter data type returned by the asynchronous interface. */
public interface AsyncCallback <D> {
    void onResult(D data);
}Copy the code

To initiate a network request to download the image file, we directly call the Downloader interface described in the previous article “Asynchronous processing in Android and iOS Development (II) — Callbacks for asynchronous tasks” (note: use the version of Dowanloder interface with contextData parameter).

Here are examples of code for “find Disk Cache” and “initiate network download request” :

    // Check level-2 cache: disk cache
    imageDiskCache.getImage(url, new AsyncCallback<Bitmap>() {
        @Override
        public void onResult(Bitmap bitmap) {
            if(bitmap ! =null) {
                // Disk Cache hit, and the loading task ended prematurely.
                imageMemCache.putImage(url, bitmap);
                successCallback(url, bitmap, contextData);
            }
            else {
                // If both caches fail, call downloader to downloaddownloader.startDownload(url, getLocalPath(url), contextData); }}});Copy the code

The following is an example of the code to implement the Downloader successful result callback:

    @Override
    public void downloadSuccess(final String url, final String localPath, final Object contextData) {
        // Decode images asynchronously, which is a time-consuming operation
        imageDecodingExecutor.execute(new Runnable() {
            @Override
            public void run(a) {
                final Bitmap bitmap = decodeBitmap(new File(localPath));
                // reroute the main thread
                mainHandler.post(new Runnable() {
                    @Override
                    public void run(a) {
                        if(bitmap ! =null) {
                            imageMemCache.putImage(url, bitmap);
                            imageDiskCache.putImage(url, bitmap, null);
                            successCallback(url, bitmap, contextData);
                        }
                        else {
                            // Decoding failedfailureCallback(url, ImageLoaderListener.BITMAP_DECODE_FAILED, contextData); }}}); }}); }Copy the code

Multiple asynchronous tasks are executed concurrently, and the results are merged

“Concurrent execution, result merge” refers to the simultaneous start of multiple asynchronous tasks, they are executed concurrently, and when they are all completed, all the results are merged together for subsequent processing.

A typical example would be to make multiple network requests at the same time (that is, remote API), wait for the data returned from all the requests, and then process the data together to update the UI. This reduces the total request time by making concurrent network requests.

We present sample code for the simplest of two concurrent network requests.

First, we need to define the asynchronous interface we need, the remote API interface.

/** * Http service request interface. */
public interface HttpService {
    /** * Initiates an HTTP request. *@paramApiUrl Requests URL *@paramRequest Request parameters (represented as Java beans) *@paramListener Callback listener *@paramContextData passthrough parameter *@param<T> Request Model type *@param<R> Response Model type */
    <T, R> void doRequest(String apiUrl, T request, HttpListener<? super T, R> listener, Object contextData);
}

/** * Listener interface for the Http service. **@param<T> Request Model type *@param<R> Response Model type */
public interface HttpListener <T.R> {
    /** * The callback interface that generates the request result (success or failure). *@paramApiUrl Requests URL *@paramRequest Request Model *@paramResult Request result (including response or error cause) *@paramContextData passthrough parameter */
    void onResult(String apiUrl, T request, HttpResult<R> result, Object contextData);
}Copy the code

Note that in the HttpService interface definition, the request parameter is defined using the Generic type T. If this interface had an implementation, the implementation code would use reflection to convert the actual incoming request type (which could be any Java Bean) into Http request parameters. Of course, we are only talking about interfaces here; implementation is not the point here.

The return result parameter is of type HttpResult, which allows it to express both successful and failed responses. HttpResult is defined as follows:

/** * HttpResult encapsulates the result of the Http request. ** When the server responds successfully, errorCode = SUCCESS and the server response is converted to response; * errorCode! When the server fails to respond successfully = SUCCESS, and the response value is invalid@param<R> Response Model type */
public class HttpResult <R> {
    /** * error code definition */
    public static final int SUCCESS = 0;/ / success
    public static final int REQUEST_ENCODING_ERROR = 1;Error encoding request
    public static final int RESPONSE_DECODING_ERROR = 2;// Error decoding the response
    public static final int NETWORK_UNAVAILABLE = 3;// The network is unavailable
    public static final int UNKNOWN_HOST = 4;// Domain name resolution failed
    public static final int CONNECT_TIMEOUT = 5;// Connection timed out
    public static final int HTTP_STATUS_NOT_OK = 6;// Download request returns a non-200
    public static final int UNKNOWN_FAILED = 7;// Other unknown errors

    private int errorCode;
    private String errorMessage;
    /** * response is a response returned by the server
    private R response;

    public int getErrorCode(a) {
        return errorCode;
    }

    public void setErrorCode(int errorCode) {
        this.errorCode = errorCode;
    }

    public String getErrorMessage(a) {
        return errorMessage;
    }

    public void setErrorMessage(String errorMessage) {
        this.errorMessage = errorMessage;
    }

    public R getResponse(a) {
        return response;
    }

    public void setResponse(R response) {
        this.response = response; }}Copy the code

HttpResult also contains a Generic type R, which is the type of response parameter returned when the request succeeds. Also, in a possible implementation of HttpService, reflection should again be used to transform the response content returned by the request (which could be a Json string) into type R (which could be any Java Bean).

Ok, now that we have the HttpService interface, we can demonstrate how to send two network requests simultaneously.

public class MultiRequestsDemoActivity extends AppCompatActivity {
    private HttpService httpService = new MockHttpService();
    /** * Caches the Map of each request */
    private Map<String, Object> httpResults = new HashMap<String, Object>();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_multi_requests_demo);

        // Make two asynchronous requests simultaneously
        httpService.doRequest("http://...".new HttpRequest1(),
                new HttpListener<HttpRequest1, HttpResponse1>() {
                    @Override
                    public void onResult(String apiUrl, HttpRequest1 request, HttpResult
       
         result, Object contextData)
        {
                        // Cache the request result
                        httpResults.put("request-1", result);
                        if (checkAllHttpResultsReady()) {
                            // Both requests have ended
                            HttpResult<HttpResponse1> result1 = result;
                            HttpResult<HttpResponse2> result2 = (HttpResult<HttpResponse2>) httpResults.get("request-2");
                            if (checkAllHttpResultsSuccess()) {
                                // Both requests were successful
                                processData(result1.getResponse(), result2.getResponse());
                            }
                            else {
                                // Both requests are not completely successfulprocessError(result1.getErrorCode(), result2.getErrorCode()); }}}},null);
        httpService.doRequest("http://...".new HttpRequest2(),
                new HttpListener<HttpRequest2, HttpResponse2>() {
                    @Override
                    public void onResult(String apiUrl, HttpRequest2 request, HttpResult
       
         result, Object contextData)
        {
                        // Cache the request result
                        httpResults.put("request-2", result);
                        if (checkAllHttpResultsReady()) {
                            // Both requests have ended
                            HttpResult<HttpResponse1> result1 = (HttpResult<HttpResponse1>) httpResults.get("request-1");
                            HttpResult<HttpResponse2> result2 = result;
                            if (checkAllHttpResultsSuccess()) {
                                // Both requests were successful
                                processData(result1.getResponse(), result2.getResponse());
                            }
                            else {
                                // Both requests are not completely successfulprocessError(result1.getErrorCode(), result2.getErrorCode()); }}}},null);
    }

    /** * Check to see if all requests have results *@return* /
    private boolean checkAllHttpResultsReady(a) {
        int requestsCount = 2;
        for (int i = 1; i <= requestsCount; i++) {
            if (httpResults.get("request-" + i) == null) {
                return false; }}return true;
    }

    /** * check if all requests were successful *@return* /
    private boolean checkAllHttpResultsSuccess(a) {
        int requestsCount = 2;
        for (int i = 1; i <= requestsCount; i++) { HttpResult<? > result = (HttpResult<? >) httpResults.get("request-" + i);
            if (result == null|| result.getErrorCode() ! = HttpResult.SUCCESS) {return false; }}return true;
    }

    private void processData(HttpResponse1 data1, HttpResponse2 data2) {
        //TODO:Update the UI to display the result of the request. Omit the code here
    }

    private void processError(int errorCode1, int errorCode2) {
        //TODO:Update UI to show errors. Omit the code here}}Copy the code

We first wait for both requests to complete before combining their results. In order to determine whether both asynchronous requests are completed, we need to determine whether all requests have been returned at the time of either request callback. It is important to note that the onResult of HttpService is scheduled to execute on the main thread. We discussed the threading environment in which callbacks occur in our previous article, asynchronous processing in Android and iOS Development (PART 2) : Callbacks to asynchronous Tasks, in the section called “The Threading model for Callbacks.” If onResult is already scheduled for the main thread, the onResult callback sequence of the two requests can only be in two ways: execute the onResult of the first request and then execute the onResult of the second request; Or execute the onResult of the second request before executing the onResult of the first request. The judgment inside onResult in the above code is valid regardless of the order.

However, if the onResult of the HttpService is executed on different threads, the onResult callbacks of the two requests may be executed interchangeably, and the various judgments in the HttpService will also have synchronization problems.

Compared to the “sequential execution” mentioned earlier, the concurrent execution here obviously introduces a significant amount of complexity. If we don’t have a particularly strong need for performance gains from concurrency, we might prefer a sequential execution collaboration that keeps the code logic simple and understandable.

If multiple asynchronous tasks are executed concurrently, one takes precedence

“Concurrent execution, one priority” refers to that multiple asynchronous tasks are started at the same time and executed concurrently. However, different tasks have different priorities. At the end of the task execution, the result returned by the task with higher priority is preferred. If the higher-priority task finishes first, the lower-priority task that finishes later is ignored. If the lower-priority task completes first, the result of the higher-priority task overrides the result of the lower-priority task.

A classic example is page caching. For example, a page might display a dynamic list of data. If you only fetch list data from the server every time the page is opened, the page will be blank for a long time if there is no network or if the network is slow. It is usually better to display an old piece of data than nothing at all. Therefore, we might consider adding a local persistent cache to the list data.

The local cache is also an asynchronous task, and the interface code is defined as follows:

public interface LocalDataCache {
    /** * asynchronously obtain the locally cached HttpResponse object. *@param key
     * @paramCallback is used to return the cache object */
    void getCachingData(String key, AsyncCallback<HttpResponse> callback);

    /** * Save the HttpResponse object to the cache. *@param key
     * @paramData The HttpResponse object * to save@paramCallback returns success or failure as a result of the current save operation. */
    void putCachingData(String key, HttpResponse data, AsyncCallback<Boolean> callback);
}Copy the code

The data object cached by this local cache is an HttpResponse object fetched from the server earlier. The asynchronous callback interface, AsyncCallback, is described earlier.

This way, when the page opens, we can start both the local cache read task and the remote API request task. The latter has a higher priority than the former.

public class PageCachingDemoActivity extends AppCompatActivity {
    private HttpService httpService = new MockHttpService();
    private LocalDataCache localDataCache = new MockLocalDataCache();
    /** * Whether the data from the Http request has been returned */
    private boolean dataFromHttpReady;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_page_caching_demo);

        // Initiate both local data requests and remote Http requests
        final String userId = "xxx";
        localDataCache.getCachingData(userId, new AsyncCallback<HttpResponse>() {
            @Override
            public void onResult(HttpResponse data) {
                if(data ! =null && !dataFromHttpReady) {
                    // Old data in cache & old data is displayed before remote Http request is returnedprocessData(data); }}}); httpService.doRequest("http://...".new HttpRequest(),
                new HttpListener<HttpRequest, HttpResponse>() {
                    @Override
                    public void onResult(String apiUrl, HttpRequest request, HttpResult
       
         result, Object contextData)
        {
                        if (result.getErrorCode() == HttpResult.SUCCESS) {
                            dataFromHttpReady = true;
                            processData(result.getResponse());
                            // Pull the latest data from Http to update the local cache
                            localDataCache.putCachingData(userId, result.getResponse(), null);
                        }
                        else{ processError(result.getErrorCode()); }}},null);
    }


    private void processData(HttpResponse data) {
        //TODO:Update the UI to display data. Omit the code here
    }

    private void processError(int errorCode) {
        //TODO:Update UI to show errors. Omit the code here}}Copy the code

While reading locally cached data is generally much faster than fetching data from the network, since both are asynchronous interfaces, there is a logical possibility that the network fetching data will be called back before the locally cached data. Furthermore, the “early failure result callbacks” and “early success result callbacks” mentioned in the callback Order section of our previous article “Asynchronous Processing in Android and iOS Development (PART 2) — Callbacks to Asynchronous Tasks” provide a more realistic basis for this to happen.

In the above code, we record a Boolean flag dataFromHttpReady if the network fetched data before the local cached data callback. When the task of fetching the locally cached data is complete, we judge this flag and ignore the cached data.

In the case of page caching alone, the “concurrent execution, one party first” approach here does not provide significant performance gains because there is generally a significant difference in the execution time required to read locally cached data and fetch data from the network. This means that if we changed the page caching example to a sequential execution implementation, we might get the simplicity of the code logic without sacrificing too much performance.

Of course, if you decide to use the “concurrent execution, one party first” asynchronous task collaboration described in this section, be sure to consider all possible execution orders for asynchronous task callbacks.

Use RxJava ZIP to implement concurrent network requests

So far, we have not used any tools to deal with the various cooperative relationships that occur when multiple asynchronous tasks are executed, which is a “hand-to-hand combat” situation. The next step in this section is to introduce a heavy weapon, RxJava, to see if it can make a difference in the complexity of asynchronous problems on Android.

Let’s take the second scenario, “concurrent network requests,” as an example.

In RxJava, there is a ZIP operation built on top of the Lift operation that merges data from multiple Observables into a new Observable. This is exactly the feature needed for the “concurrent network request” scenario.

We can think of two concurrent network requests as two Observables and combine their results using the ZIP operation. That seems like a lot of simplification. First, however, we need to address another issue: encapsulating the asynchronous network request interface that HttpService represents as an Observable.

Generally speaking, it is easy to encapsulate a synchronous task as an Observable, while it is not intuitive to encapsulate a ready-made asynchronous task as an Observable. We need to use AsyncOnSubscribe.

public class MultiRequestsDemoActivity extends AppCompatActivity {
    private HttpService httpService = new MockHttpService();

    private TextView apiResultDisplayTextView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_multi_requests_demo);

        apiResultDisplayTextView = (TextView) findViewById(R.id.api_result_display);

        /** * Wrap the two requests into two Observables */ based on the AsyncOnSubscribe mechanism

        Observable<HttpResponse1> request1 = Observable.create(new AsyncOnSubscribe<Integer, HttpResponse1>() {
            @Override
            protected Integer generateState(a) {
                return 0;
            }

            @Override
            protected Integer next(Integer state, long requested, Observer<Observable<? extends HttpResponse1>> observer) {
                final Observable<HttpResponse1> asyncObservable = Observable.create(new Observable.OnSubscribe<HttpResponse1>() {
                    @Override
                    public void call(final Subscriber<? super HttpResponse1> subscriber) {
                        // Initiates the first asynchronous request
                        httpService.doRequest("http://...".new HttpRequest1(),
                                new HttpListener<HttpRequest1, HttpResponse1>() {
                                    @Override
                                    public void onResult(String apiUrl, HttpRequest1 request, HttpResult<HttpResponse1> result, Object contextData) {
                                        // The first asynchronous request completes, sending the result to asyncObservable
                                        if (result.getErrorCode() == HttpResult.SUCCESS) {
                                            subscriber.onNext(result.getResponse());
                                            subscriber.onCompleted();
                                        }
                                        else {
                                            subscriber.onError(new Exception("request1 failed")); }}},null); }}); observer.onNext(asyncObservable); observer.onCompleted();return 1; }}); Observable<HttpResponse2> request2 = Observable.create(new AsyncOnSubscribe<Integer, HttpResponse2>() {
            @Override
            protected Integer generateState(a) {
                return 0;
            }

            @Override
            protected Integer next(Integer state, long requested, Observer<Observable<? extends HttpResponse2>> observer) {
                final Observable<HttpResponse2> asyncObservable = Observable.create(new Observable.OnSubscribe<HttpResponse2>() {
                    @Override
                    public void call(final Subscriber<? super HttpResponse2> subscriber) {
                        // Start the second asynchronous request
                        httpService.doRequest("http://...".new HttpRequest2(),
                                new HttpListener<HttpRequest2, HttpResponse2>() {
                                    @Override
                                    public void onResult(String apiUrl, HttpRequest2 request, HttpResult<HttpResponse2> result, Object contextData) {
                                        // The second asynchronous request ends, sending the result to asyncObservable
                                        if (result.getErrorCode() == HttpResult.SUCCESS) {
                                            subscriber.onNext(result.getResponse());
                                            subscriber.onCompleted();
                                        }
                                        else {
                                            subscriber.onError(new Exception("reques2 failed")); }}},null); }}); observer.onNext(asyncObservable); observer.onCompleted();return 1; }});// Combine the results of two Observable requests with zip
        Observable.zip(request1, request2, new Func2<HttpResponse1, HttpResponse2, List<Object>>() {
            @Override
            public List<Object> call(HttpResponse1 response1, HttpResponse2 response2) {
                List<Object> responses = new ArrayList<Object>(2);
                responses.add(response1);
                responses.add(response2);
                return responses;
            }
        }).subscribe(new Subscriber<List<Object>>() {
            private HttpResponse1 response1;
            private HttpResponse2 response2;

            @Override
            public void onNext(List<Object> responses) {
                response1 = (HttpResponse1) responses.get(0);
                response2 = (HttpResponse2) responses.get(1);
            }

            @Override
            public void onCompleted(a) {
                processData(response1, response2);
            }

            @Override
            public void onError(Throwable e) { processError(e); }}); }private void processData(HttpResponse1 data1, HttpResponse2 data2) {
        //TODO:Update the UI to display data. Omit the code here
    }

    private void processError(Throwable e) {
        //TODO:Update UI to show errors. Omit the code here
    }Copy the code

By introducing RxJava, we simplify the logic for determining the end of asynchronous task execution, but spend most of our effort on “encapsulating HttpService as an Observable.” As we said, RxJava is a “heavy weapon” that can accomplish far more than is needed here. RxJava used here, inevitably give a person “kill chicken with a knife” feeling.

For the other two types of asynchronous task collaboration: “sequential execution” and “concurrent execution, one takes precedence”, if you want to use RxJava to solve the problem, you also need to become an expert in RxJava first to be able to do this well.

In the case of sequential execution, which is simple enough by itself, it is even simpler not to introduce another framework. Sometimes, we may prefer simple processing logic, so the execution of multiple asynchronous tasks as “sequential execution” is also a solution. Although this will hurt some performance.


In this article, we discussed three types of multi-asynchronous task collaboration, but we don’t want to make the decision to change the execution of multiple asynchronous tasks to sequential execution to simplify the processing logic. The trade-off is still up to the developer.

Also, it’s important to note that, in many cases, the choice is not ours, and the code architecture we’re given may have created all sorts of asynchronous task collaborations. What we need to do is to always be able to keep a cool head when this happens, and to recognize and understand which situation we are in from the complicated code logic.

(after)

Other selected articles:

  • Asynchronous processing in Android and iOS development (part 2) — Callback for asynchronous tasks
  • Authentic technology with wild way
  • How annoying is Android push?
  • Manage App numbers and red dot tips with a tree model
  • A diagram to read thread control in RxJava
  • The descriptor of the End of the Universe (2)
  • Redis internal data structure details (5) – QuickList
  • Redis internal data structure (4) – Ziplist
  • The programmer’s cosmic timeline
  • Redis internal data structure (1) – dict