When we get a firmware, we often want to get some basic information of firmware, such as firmware architecture, kernel version, can extract file system and other information, FirmAE extractor using Binwalk extraction, before their own extraction method is too simple, basic are included in this script, Now look at how the extraction method is implemented using the source code.

  • A method is called
usage: extractor.py [-h] [-sql  SQL] [-nf] [-nk] [-np] [-b BRAND] [-d]
                    input [output]

Extracts filesystem and kernel from Linux-based firmware images

positional arguments:
  input       Input file or directory
  output      Output directory for extracted firmware

optional arguments:
  -h, --help  show this help message and exit
  -sql  SQL   Hostname of SQL server
  -nf         Disable extraction of root filesystem (may decrease extraction
              time)
  -nk         Disable extraction of kernel (may decrease extraction time)
  -np         Disable parallel operation (may increase extraction time)
  -b BRAND    Brand of the firmware image
  -d          Print debug information
Copy the code

1. Implementation principle

1.1 Extractor. Extract

The first is a class Extractor, which extracts the kernel and file system from firmware, using the thread Pool Multiprocessing.pool () to support concurrent extraction of multiple firmware. Call Extractor class extract method, complete the initialization of concurrent extraction.

def extract(self) :
    """ Perform extraction of firmware updates from input to tarballs in output directory using a thread pool. """
    # Input support directory or single file stores all files in _list
    if os.path.isdir(self._input) :for path, _, files in os.walk(self._input) :for item in files:
                self._list.append(os.path.join(path, item))
    elif os.path.isfile(self._input):
        self._list.append(self._input)
    Create an output folder
    if self.output_dir and not os.path.isdir(self.output_dir):
        os.makedirs(self.output_dir)

    if self._pool:
        # since we have to handle multiple files in one firmware image, it
        # is better to use chunk_size=1 
        Chunksize is set to speed up processing for large files
        # chunk_size set to 1 will throw a timeout exception based on the timeout duration
        chunk_size = 1
        Unroll the iterator to extract each file, equivalent to pressing the multi-process extract switch
        list(self._pool.imap_unordered(self._extract_item, self._list, chunk_size))
    else:
        for item in self._list:
            self._extract_item(item)
Copy the code

The extraction of each file is handled by self._extract_item separately, and ExtractionItem class is called internally for extraction.

def _extract_item(self, path) :
    """ Wrapper function that creates an ExtractionItem and calls the extract() method. """
    ExtractionItem(self, path, 0.None, self.debug).extract()
Copy the code

1.2 ExtractionItem. Extract

ExtractionItem extract function

  1. It starts with exit conditions:

    • Check extraction status (interrupt signal or exit when all extraction is completed)
    • Whether it is beyond the recursive range, recursive extraction, self-specified iteration width and depth.
  2. Check whether the md5 of the firmware is in visited set, skip the files with the same checksum to avoid repeated extraction.

# check if checksum is in visited set
self.printf(">> MD5: %s" % self.checksum)
with Extractor.visited_lock:
    # Skip the same checksum only in the same status
    # asus_latest(FW_RT_N12VP_30043804057.zip) firmware
    if (self.checksum in self.extractor.visited and
            self.extractor.visited[self.checksum] == self.status):
        self.printf(">> Skipping: %s..." % self.checksum)
        return self.get_status()
    else:
        self.extractor.visited[self.checksum] = self.status
Copy the code
  1. Check whether the file type is in the blacklist. If the file type is in the blacklist, it is not extracted

First, use MIME-type to exclude large file categories as any of the following types

if filetype:
    if any(s in filetype for s in ["application/x-executable"."application/x-dosexec"."application/x-object"."application/x-sharedlib"."application/pdf"."application/msword"."image/"."text/"."video/"]):
        self.printf(">> Skipping: %s..." % filetype)
        return True
Copy the code

Next, check for specific file types with MIME types

filetype = Extractor.magic(real_path.encode("utf-8"."surrogateescape"))
if filetype:
    if any(s in filetype for s in ["executable"."universal binary"."relocatable"."bytecode"."applet"."shared"]):
        self.printf(">> Skipping: %s..." % filetype)
        return True
Copy the code

Finally, check for specific file extensions that may have been misidentified

black_lists = ['.dmg'.'.so'.'.so.0']
for black in black_lists:
    if self.item.endswith(black):
        self.printf(">> Skipping: %s..." % (self.item))
        return True
Copy the code
  1. Start the actual extraction process

Create a temporary folder

self.temp = tempfile.mkdtemp()
# Move to temporary directory so binwalk does not write to input
os.chdir(self.temp)
Copy the code

Binwalk is used to extract, mainly by searching keywords.

try:
    self.printf(">> Tag: %s" % self.tag)
    self.printf(">> Temp: %s" % self.temp)
    self.printf(">> Status: Kernel: %s, Rootfs: %s, Do_Kernel: %s, \ Do_Rootfs: %s" % (self.get_kernel_status(),
                            self.get_rootfs_status(),
                            self.extractor.do_kernel,
                            self.extractor.do_rootfs))

    for module in binwalk.scan(self.item, "-e"."-r"."-C", self.temp,
                                signature=True, quiet=True):
        prev_entry = None
        for entry in module.results:
            desc = entry.description
            dir_name = module.extractor.directory

            if prev_entry and prev_entry.description == desc and \
                    'Zlib comparessed data' in desc:
                continue
            prev_entry = entry

            self.printf('========== Depth: %d ===============' % self.depth)
            self.printf("Name: %s" % self.item)
            self.printf("Desc: %s" % desc)
            self.printf("Directory: %s" % dir_name)

            self._check_firmware(module, entry)

            if not self.get_rootfs_status():
                self._check_rootfs(module, entry)

            if not self.get_kernel_status():
                self._check_kernel(module, entry)

            if self.update_status():
                self.printf(">> Skipping: completed!")
                return True
            else:
                self._check_recursive(module, entry)


except Exception:
    print ("ERROR: ", self.item)
    traceback.print_exc()
Copy the code
  • Extract the logic
    • For each entry, if the description is the same as the previous entry, it will be skipped. If there is Zlib Comparessed data in the description, it will be skipped

    • Call _check_firmware and extract it directly if the firmware type is known.

      • Indicates whether the description information existsheaderThe keyword
        • Is there auImage headerKeywords, if yes, extract size according to size keyword (for uImage)
        • Is there arootfs offsetkernel offsetKey words, sometimes according to the size keyword after the size extract kernel and file system (for TP-link or TRX)
    • def _check_rootfs(self, module, entry)

      • Whether filesystem, archive, and compressed are contained in the description information. If yes, check whether the directory name of the Unix filesystem is displayed in the extraction result

        UNIX_DIRS = ["bin"."etc"."dev"."home"."lib"."mnt"."opt"."root"."run"."sbin"."tmp"."usr"."var"]
        UNIX_THRESHOLD = 4
        Copy the code
    • def _check_kernel(self, module, entry)

      • Indicates whether the description information existskernel, or extract the kernel version
    • Check whether the extraction is complete, if not, iterate until the extraction is complete. The extracted results are stored in the PostgresQL database.

Example 2.

Take extracting firmware dir859AX_FW105B03.bin as an example to analyze the extraction process.

Python3 sources/extractor/extractor. Py - SQL 127.0.0.1 - d. / firmwares DIR859Ax_FW105b03. Bin. / testext2Copy the code

The output log

Cuc @ cuc - VirtualBox: ~ / workspace/FirmAE $python3 sources/extractor/extractor. Py - SQL 127.0.0.1 - d ./firmwares/DIR859Ax_FW105b03.bin ./testext2 >> Database Image ID: 2 /home/cuc/workspace/FirmAE/firmwares/DIR859Ax_FW105b03.bin >> MD5: f0398570673fcc633d35dcbb672b3792 >> Tag: 2 >> Temp: /tmp/tmpt7x2ygb0 >> Status: Kernel: False, Rootfs: False, Do_Kernel: True, Do_Rootfs: True ========== Depth: 0 =============== Name: /home/cuc/workspace/FirmAE/firmwares/DIR859Ax_FW105b03.bin Desc: DLOB firmware header, boot partition:"dev=/dev/mtdblock/1" # firmware head
Directory: /tmp/tmpt7x2ygb0
========== Depth: 0 ===============
Name: /home/cuc/workspace/FirmAE/firmwares/DIR859Ax_FW105b03.bin
Desc: LZMA compressed data, properties: 0x5D, dictionary size: 33554432 bytes, uncompressed size: 3650048 bytes
Directory: /tmp/tmpt7x2ygb0
>>>> Found Linux filesystem in /tmp/tmpt7x2ygb0/_DIR859Ax_FW105b03.bin.extracted/squashfs-root! Discover the file system
>> Recursing into LZMA compressed data, properties: 0x5D, dictionary size: 33554432 bytes, uncompressed size: 3650048 bytes ...

/tmp/tmpt7x2ygb0/_DIR859Ax_FW105b03.bin.extracted/74
        >> MD5: ee431f37e5fb18459f3b9e9554e02505
        >> Tag: 2
        >> Temp: /tmp/tmp2bvlhs7m
        >> Status: Kernel: False, Rootfs: True, Do_Kernel: True,                 Do_Rootfs: True
        ========== Depth: 1 ===============
        Name: /tmp/tmpt7x2ygb0/_DIR859Ax_FW105b03.bin.extracted/74
        Desc: Certificate in DER format (x509 v3), header length: 4, sequence length: 30 # certificate
        Directory: /tmp/tmp2bvlhs7m
        ========== Depth: 1 ===============
        Name: /tmp/tmpt7x2ygb0/_DIR859Ax_FW105b03.bin.extracted/74
        Desc: Certificate in DER format (x509 v3), header length: 4, sequence length: 30  # certificate
        Directory: /tmp/tmp2bvlhs7m
        ========== Depth: 1 ===============
        Name: /tmp/tmpt7x2ygb0/_DIR859Ax_FW105b03.bin.extracted/74
        Desc: Certificate in DER format (x509 v3), header length: 4, sequence length: 30  # certificateDirectory: /tmp/tmp2bvlhs7m ========== Depth: 1 =============== Name: / TMP/tmpt7x2ygb0 / _DIR859Ax_FW105b03. Bin. Extracted / 74 Desc: Linux kernel version 2.6.31# 2.6 kernelDirectory: / TMP /tmp2bvlhs7m >>>> Linux kernel version 2.6.31 >> Skipping: completed! >> Cleaning up /tmp/tmp2bvlhs7m... ========== Depth: 0 =============== Name: /home/cuc/workspace/FirmAE/firmwares/DIR859Ax_FW105b03.bin Desc: PackImg section delimiter tag, little endian size: 8421120 bytes; big endian size: 8355840 bytes Directory: /tmp/tmpt7x2ygb0 >> Skipping: completed! >> Cleaning up /tmp/tmpt7x2ygb0...Copy the code

You can see the kernel and file system extracted