Teach you how to use RxJS in 200 lines of code elegant file fragmentation continuation.

This article is the third in a series of articles introducing RxJS, which will start with a small example and gradually go deeper into the application of RxJS in various scenarios. There will also be explanations of the various RxJS operators. This article will follow the second one
Connect the world with RxJSTo implement a file fragment resumable example. In this example, we will use more operators (RxJS operators) to handle our business, and the functions and usage scenarios of these operators will be explained in more detail in future articles.

Intro

Ben Lesh often compares RxJS to Lodash for Async in his various talking points to highlight the powerful asynchronous control capabilities of RxJS, and INDEED RxJS is as powerful for Async as Lodash is for Array. Similar to LoDash’s excellent performance, RxJS performs very well when handling asynchronous tasks without sacrificing too much performance for high levels of abstraction. This article will take a relatively complex asynchronous task as an example, step by step to show how RxJS simple and elegant complex asynchronous control.

The preparatory work

Clone your seed project in learning-RxJS, and base your Article3 branch on article3-seed Checkout. All RxJS code in this article will be written in TypeScript.

In this article, we will use RxJS to do the following:

The Article3-seed branch comes with a simple file upload server that implements a simple file sharding upload API.

There are three apis that provide calls:

  • post /api/upload/chunk

    Used to obtain file fragment information, upload file size, file MD5, file modification time and file name

    The server returns the number of file fragments, the size of each fragment, and the unique fileKey of the file

    • Request body:

    {fileSize: string // fileSize fileMD5: string // fileMD5 lastUpdated: ISOString // time when the file was last modified fileName: string // fileName}Copy the code
    • Response:

    {chunkSize: number // chunks: number // chunks: number fileKey: string // unique file ID}Copy the code
  • post /api/upload/chunk/:fileKey? chunk=:chunk&chunks=:chunks

    Used to upload file fragments

    • Request header: ‘Content-Type’: ‘application/octet-stream’

    • Request body: blob

    • Response: ‘ok’ or error message

  • post /api/upload/chunk/:fileKey

    Settlement of file shards, the back end will join the shards into a complete file and return

    Response: ‘ok’ or error message

Based on these apis, we will implement the following functionality in this article

  1. Add a button to the left of the Add button to select a file & (pause & Resume) file upload

  2. After adding files:

    • Calculate the md5, file name, time when the file was last modified, and file size

    • Call the POST/API /upload/chunk interface to obtain the file fragment information

    • According to the fragmentation of information file fragmentation, and invoke the post/API/upload/the chunk: fileKey? Chunk =:chunk&chunks=:chunks Upload file fragments. Only three fragments can be uploaded at the same time

    • After uploading all the shard, call the post/API/upload/the chunk: fileKey settlement documents


  3. During the upload process, a progress bar displays the upload progress under the input box

  4. After the upload starts, the button to select the file becomes the pause button, which pauses the upload when clicked

  5. Click the pause upload button, the button becomes the continue upload button, click on the pause place to continue uploading

To implement the above functionality, and to distinguish it from the previous Todo app, we create a new fileuploader. ts file in the SRC folder and implement the requirements in this file:

// FileUploader.ts import { Observable } from 'rxjs' // @warn memory leak const $attachment = document.querySelector('.attachment') export class FileUploader { private file$ = Observable.fromEvent($attachment, 'change') .map((r: Event) => (r.target as HTMLInputElement).files[0]) .filter(f => !! f) uploadStream$ = this.file$ }Copy the code

Add the attachment node to HTML:

// index.html ... <div class="input-group-btn"> <label class="btn btn-default btn-file glyphicon glyphicon-paperclip attachment"> <input type="file" style="display: none;" > </label> <div class="btn btn-default button-add">Add</div> </div> ...Copy the code

Adjust the style:

// style.css
...
.attachment {
  top: 0;
}
Copy the code

Then merge the stream we will implement into app$in app.ts:

. import { FileUploader } from './FileUploader' ... const uploader = new FileUploader() const app$ = toggle$.merge(remove$, search$, uploader.uploadStream$) .do(r => { console.log(r) }) app$.subscribe()Copy the code

If the attachment button is used to select a file, you can already see the file from app$in the console:




Obtain file fragment information

We use FileReader + spark-MD5 to calculate the MD5 information of the File. Other information can be obtained directly from the File object. The FileReader reads the file asynchronously, encapsulates it as an Observable to combine with uploadStream$:

Import {Observable, Observer} from 'RXJS' // spark-md5 // If noImplicitAny is not set in tsconfig.json: // import * as SparkMD5 from 'spark-md5' const SparkMD5 = require('spark-md5') const SparkMD5 = require('spark-md5') const $attachment = document.querySelector('.attachment') interface FileInfo { fileSize: number fileMD5: string lastUpdated: string fileName: string } export class FileUploader { private file$ = Observable.fromEvent($attachment, 'change') .map((r: Event) => (r.target as HTMLInputElement).files[0]) .filter(f => !! f) uploadStream$ = this.file$ .switchMap(this.readFileInfo) private readFileInfo(file: File): Observable<{ file: File, fileinfo: FileInfo }> { const reader = new FileReader() const spark = new SparkMD5.ArrayBuffer() reader.readAsArrayBuffer(file) return Observable.create((observer: Observer<{ file: File, fileinfo: FileInfo }>) => { reader.onload = (e: Event) => { spark.append((e.target as FileReader).result) const fileMD5 = spark.end() observer.next({ file, fileinfo: { fileMD5, fileSize: file.size, lastUpdated: file.lastModifiedDate.toISOString(), fileName: file.name } }) observer.complete() } return () => { if (! reader.result) { console.warn('read file aborted') reader.abort() } } }) } }Copy the code

Now you can see that the FileInfo of the file can flow out of app$:




Then use the file information to obtain the file fragment information through the POST/API /upload/chunk interface:

. Const apiHost = 'http://127.0.0.1:5000/api'... interface ChunkMeta { fileSize: number chunkSize: number chunks: number fileKey: string } ... export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) ) }Copy the code




Shard to upload

After getting the shard information, the first thing we need to do is to slice the file according to the shard information. Make a slice method to slice the file:

. export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) ... private slice(file: File, n: number, chunkSize: number): Blob[] { const result: Blob[] = [] for (let i = 0; i < n; i ++) { const startSize = i * chunkSize const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize) result.push(slice) } return result } }Copy the code

At this point, we can see the sharded blobs and meta information:




After slicing the file, we need to implement a method for uploading the shard:

. export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) .switchMap(({ blobs, chunkMeta }) => { const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob)) const uploadStream = Observable.from(dists) .mergeAll(this.concurrency) return Observable.forkJoin(uploadStream) .mapTo(chunkMeta) }) constructor( private concurrency = 3 ) { } ... private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) { const host = `${apiHost}/upload/chunk/${meta.fileKey}? chunk=${index + 1}&chunks=${meta.chunks}` return Observable.ajax({ url: host, body: blob, method: 'post', crossDomain: true, headers: { 'Content-Type': 'application/octet-stream' } }) } }Copy the code

UploadChunk is here to upload a single file fragmentation method, the most the logic behind a switchMap uploadStream $is to use mergeAll operator will upload all the flow of the merge into one observables, The behavior is to upload all the shards concurrently. ForkJoin waits for uploadStream complete after merge before emitting a result. The use of forkJoin in mergeAll is very similar to the behavior of Promise. All, which can also be written as:

. const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob)) return Observable.forkJoin(... dists) .mapTo(chunkMeta) ...Copy the code

We have a requirement that only three shards be uploaded at a time, so we need to use the mergeAll method and call concurrency = 3 to control the concurrency number. Now we can choose a file to look at the behavior of the upload on Devtool. If there is no problem with the application, the behavior should be: upload shards concurrently, and only 3 shards are always uploaded at the same time, and the chunkMeta data will flow out of app$after all the shards are uploaded.

Finally, we just need to settle the shards and the file is uploaded:

. export class FileUploader { ... uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) .switchMap(({ blobs, chunkMeta }) => { const dists = blobs.map((blob, index) => this.uploadChunk(chunkMeta, index, blob)) const uploadStream = Observable.from(dists) .mergeAll(this.concurrency) return Observable.forkJoin(uploadStream) .mapTo(chunkMeta) }) .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`) .mapTo({ action: 'UPLOAD_SUCCESS', payload: r }) ) }Copy the code

In this case, after selecting a file, you can see that it is fragmented and uploaded, and a data is sent in $app after clearing:

{
  "action": "UPLOAD_SUCCESS",
  "payload": {
    "chunkSize": 1048576,
    "chunks": 26,
    "fileKey": "00a12bdc10449d8ec93883a7d45292a30c",
    "fileSize": 26621938
  }
}
Copy the code

The settled file can be found under the Chunks folder of the project.

The progress bar

In order to display the progress bar in real time, we need to add the progress bar tag to index. HTML:

// index.html
...
 <div class="progress">
   <div class="progress-bar progress-bar-success" role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">
     <span>0%</span>
   </div>
</div>
...
Copy the code

Adjust the color of the text in the style:

// style.css
...
.progress-bar > span {
  color: black;
}
Copy the code

The interface should look something like this:




To get the overall upload progress, you must first get the upload progress of a single shard. Observable. Ajax has a method to get progress:..

. import { Observable, Observer, Subscriber } from 'rxjs' ... export class FileUploader { ... private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> { const host = `${apiHost}/upload/chunk/${meta.fileKey}? chunk=${index + 1}&chunks=${meta.chunks}` return Observable.create((subscriber: Subscriber<ProgressEvent>) => { const ajax$ = Observable.ajax({ url: host, body: blob, method: 'post', crossDomain: true, headers: { 'Content-Type': 'application/octet-stream' }, progressSubscriber: subscriber }) const subscription = ajax$.subscribe() return () => subscription.unsubscribe() }) } }Copy the code

This allows us to calculate the overall upload progress in uploadSteram$:

. export class FileUploader { uploadStream$ = this.file$ .switchMap(this.readFileInfo) .switchMap(i => Observable.ajax .post(`${apiHost}/upload/chunk`, i.fileinfo) .map((r) => { const blobs = this.slice(i.file, r.response.chunks, r.response.chunkSize) return { blobs, chunkMeta: r.response } }) ) .switchMap(({ blobs, chunkMeta }) => { const uploaded: number[] = [] const dists = blobs.map((blob, index) => { let currentLoaded = 0 return this.uploadChunk(chunkMeta, index, blob) .do(r => { currentLoaded = r.loaded / chunkMeta.fileSize uploaded[index] = currentLoaded const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0)) const p = Math.round(percent * 100) $progressBar.style.width = `${p}%` $progressBar.firstElementChild.textContent = `${p > 1 ? p - 1 : p} %` }) }) const uploadStream = Observable.from(dists) .mergeAll(this.concurrency) return Observable.forkJoin(uploadStream) .mapTo(chunkMeta) }) .switchMap((r: ChunkMeta) => Observable.ajax.post(`${apiHost}/upload/chunk/${r.fileKey}`) .mapTo({ action: 'UPLOAD_SUCCESS', payload: r }) ) .do(() => { $progressBar.firstElementChild.textContent = '100 %' }) }Copy the code

At this point, we can see the progress of uploading file fragments in the interface.

In general, for the convenience of use and debugging, we generally put all similar:

{
  action: 'UPLOAD_SUCCESS',
  payload: {
    chunkSize: 1048576,
    chunks: 26,
    fileKey: "00a12bdc10449d8ec93883a7d45292a30c",
    fileSize: 26621938
  }
}
Copy the code

The local state of the stream is placed in a stream:

import { Observable, Subscriber, Subject } from 'rxjs' ... type Action = 'pause' | 'resume' | 'progress' | 'complete' ... export class FileUploader { ... private action$ = new Subject<{ name: Action payload? : any }>() private progress$ = this.action$ .filter(action => action.name === 'progress') .map(action => action.payload) .do(r => { const percent = Math.round(r * 100) $progressBar.style.width = `${percent}%` $progressBar.firstElementChild.textContent = `${percent > 1 ? percent - 1 : percent} %` }) .map(r => ({ action: 'PROGRESS', payload: r })) uploadStream$ = this.file$ ... return this.uploadChunk(chunkMeta, index, blob) .do(r => { currentLoaded = r.loaded / chunkMeta.fileSize uploaded[index] = currentLoaded const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0)) this.action$.next({ name: 'progress', payload: percent }) }) ... .merge(this.progerss$) }Copy the code

More intuitive debugging information appears on the console:




Pause, continue

After selecting the file, the button to select the file becomes a pause button. We can use observable. fromEvent to do this:

. export class FileUploader { ... private click$ = Observable.fromEvent($attachment, 'click') .map((e: Event) => e.target) .filter((e: HTMLElement) => e === $attachment) .scan((acc: number, val: HTMLElement) => { if (val.classList.contains('glyphicon-paperclip')) { return 1 } if (acc === 2) { return 3 } return 2 }, 3) .filter(v => v ! == 1) .do((v) => { if (v === 2) { this.action$.next({ name: 'pause' }) $attachment.classList.remove('glyphicon-pause') $attachment.classList.add('glyphicon-play') } else { this.action$.next({ name: 'resume' }) this.buildPauseIcon() } }) uploadStream$ = this.file$ .switchMap... .switchMap... .do(() => this.buildPauseIcon()) ... .do(() => { $progressBar.firstElementChild.textContent = '100 %' // restore icon $attachment.classList.remove('glyphicon-pause') $attachment.classList.add('glyphicon-paperclip'); ($attachment.firstElementChild as HTMLInputElement).disabled = false }) .merge(this.progress$, this.click$) // side effect private buildPauseIcon() { $attachment.classList.remove('glyphicon-paperclip') $attachment.classList.add('glyphicon-pause'); ($attachment.firstElementChild as HTMLInputElement).disabled = true } }Copy the code

This code uses more concepts involved, let’s understand a little bit:

UploadStream $inserts a do operator under the two SwitchMaps of uploadStream$. This code changes the file upload icon to a pause icon.

We then created a new Click $stream, and to prevent repeated push values from bubbling up, we used map + filter to filter out events from bubbling up child nodes. In order to distinguish whether the button of upload file, pause or continue is clicked, we use 1, 2 and 3 values to represent three different click events, and use the SCAN operator to continuously generate these three states. Scan behaves much like Array# Reduce in that it receives an Accumulator that continuously accumulates new states based on current values and states (yes, the same reducer behavior as in Redux). In the do operator below we change the icon of the button according to different states.

At this time we observe the upload process, click pause/continue, the status of the icon can be correctly switched. After the upload is complete, the icon is restored to the original state of the uploaded file.







To allow the entire file upload to be paused and resumed, we use the takeUntil and repeatWhen & retryWhen operators under uploadChunk:

. export class FileUploader { ... private action$ = ... private pause$ = this.action$.filter(ac => ac.name === 'pause') private resume$ = this.action$.filter(ac => ac.name === 'resume') private progress$ = this.action$ ... .distinctUntilChanged((x: number, y: number) => x - y >= 0) ... . private uploadChunk(meta: ChunkMeta, index: number, blob: Blob): Observable<ProgressEvent> { ... return Observable.create( ... const ajax$ = Observable.ajax({ ... }) .takeUntil(this.pause$) .repeatWhen(() => this.resume$) const subscription = ajax$.subscribe() return () => subscription.unsubscribe() ) .retryWhen(() => this.resume$) } }Copy the code

The takeUntil operator takes an Observable and terminates it when it emits a value

RepeatWhen and retryWhen operators both accept a projectFunc that returns an Observable and repeats/retries when the Observable emits the value.

While pausing the recovery process, the progress bar may display the wrong number: A partially uploaded request is abort, and its progress has already been calculated once. If the request is re-uploaded, the progress bar may fall back. At this point we can achieve the effect of emitting values only when progress increases by using the distinctUntilChanged method following progress$.

conclusion

This is a super abstract article, and limited by the lack of a framework, the use of a lot of side effects in the program to manipulate DO, and it doesn’t look particularly elegant overall. A truly elegant FRP would combine RxJS with frameworks such as Redux + React so that the file upload component could be written in a more elegant way. Of course, it is not complete, many edge cases such as each step of the exception handling, but no matter, here is only a demonstration of RxJS in handling asynchronous power, and give beginners a chance to play with various RxJS operators and implement a complex asynchronous scenario. Later articles will delve into the various operators covered or not covered in the previous three articles, gradually clearing the fog of RxJS.