demand

Customers want to use Spark to analyze the number and ratio of 0s and 1s in binary files. If you want to analyze a directory, analyze each file in the directory individually. The analysis result is saved in a log file with the same name as the file to be analyzed. The log file contains the number and ratio of 0 and 1 characters.

Requirements: If the value is converted to less than eight bits of binary, you need to fill the left side with 0.

You can view the contents of binaries under Linux. Command:

XXD -- b -- c 1 filename

Copy the code

-c 1 displays 1 column of 1 character, -b displays binary

Python version

code

# This Python file uses the following encoding: utf-8



from __future__ import division

import os

import time

import sys

from pyspark import SparkConf, SparkContext



APP_NAME = "Load Bin Files"





def main(spark_context, path):

file_paths = fetch_files(path)

for file_path in file_paths:

outputs = analysis_file_content(spark_context, path + "/" + file_path)

print_outputs(outputs)

save_outputs(file_path, outputs)





def fetch_files(path):

if os.path.isfile(path):

return [path]

return os.listdir(path)





def analysis_file_content(spark_context, file_path):

data = spark_context.binaryRecords(file_path, 1)

records = data.flatMap(lambda d: list(bin(ord(d)).replace('0b', '').zfill(8)))

mapped_with_key = records.map(lambda d: ('0', 1) if d == '0' else ('1', 1))

result = mapped_with_key.reduceByKey(lambda x, y: x + y)



total = result.map(lambda r: r[1]).sum()

return result.map(lambda r: format_outputs(r, total)).collect()





def format_outputs(value_with_key, total):

tu = (value_with_key[0], value_with_key[1], value_with_key[1] / total * 100)

Return "the number of characters {0} is {1} and the ratio is {2:.2f}%". Format (*tu)





def print_outputs(outputs):

for output in outputs:

print output





def save_outputs(file_path, outputs):

result_dir = "result"

if not os.path.exists(result_dir):

os.mkdir(result_dir)



output_file_name = "result/" + file_name_with_extension(file_path) + ".output"

with open(output_file_name, "a") as result_file:

for output in outputs:

result_file.write(output + "\n")

Result_file.write (" count in {0}\n\n". Format (format_logging_time()))





def format_logging_time():

return time.strftime('%Y-%m-%d %H:%m:%s', time.localtime(time.time()))





def file_name_with_extension(path):

last_index = path.rfind("/") + 1

length = len(path)

return path[last_index:length]





if __name__ == "__main__":

conf = SparkConf().setMaster("local[*]")

conf = conf.setAppName(APP_NAME)

sc = SparkContext(conf=conf)



if len(sys.argv) ! = 2:

Print (" Please enter the correct file or directory path ")

else:

main(sc, sys.argv[1])

Copy the code

The core logic is all in the Analysis_file_content method.

run

Python is a script file that does not need to be compiled. However, to run, You must have PySpark installed. Run the following command:

./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"

Copy the code

In the pit of

Development environment issues

To use Python under Spark, pySpark needs to be installed using PIP first. As a result, the installation always fails. Python third-party libraries address is https://pypi.python.org/simple/, access is slow in China. Through searching questions, many articles mentioned domestic mirror libraries, such as douban’s library, only to find pySpark when installing.

Looking at the cause of the installation error, it is not that the library cannot be accessed, it is just that the access is slow and the download fails less than 8% of the time. This is actually why the connection timed out. The connection timeout value can therefore be modified. PIP can be added to ~/.pip/ PIP.

[global]

timeout = 6000

Copy the code

The installation is still slow, but at least PySpark is installed. Py4j is installed on a MAC. Py4j is installed on a MAC.

OSError: [1] Errno Operation not permitted: '/ System/Library/Frameworks/Python framework Versions / 2.7 / share'

Copy the code

This error occurs even if the installation is sudo and installed as an administrator. The solution is to perform the following installation:

pip install --upgrade pip



sudo pip install numpy --upgrade --ignore-installed



sudo pip install scipy --upgrade --ignore-installed



sudo pip install scikit-learn --upgrade --ignore-installed

Copy the code

Sudo PIP install PySpark is installed correctly.

Character encoding pit

Both the prompt message and the final analysis result contain Chinese. When you run the code, the following error message is displayed:

SyntaxError: Non-ASCII character '\xe5' in file /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py on line 36, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details

Copy the code

You need to add the following encoding declaration to the first line of the code file:

# This Python file uses the following encoding: utf-8

Copy the code

The pit of SparkConf

The code to initialize SparkContext looks like this:

conf = SparkConf().setMaster("local[*]")

conf = conf.setAppName(APP_NAME)

sc = SparkContext(conf)

Copy the code

Result report runtime error:

Error initializing SparkContext.

org.apache.spark.SparkException: Could not parse Master URL: '<pyspark.conf.SparkConf object at 0x106666390>'

Copy the code

According to the error message, there is a problem with setting the Master. In fact, there is a problem with instantiating SparkContext. Reading the code, its constructor declaration looks like this:

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,

environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,

gateway=None, jsc=None, profiler_cls=BasicProfiler):

Copy the code

The previous code simply passes conf to the SparkContext constructor, which causes Spark to treat conf as the value of the master parameter, which defaults to the first parameter. So there is a name argument:

sc = SparkContext(conf = conf)

Copy the code

The pit of sys. Argv

I need to pass in the path of the file I need to analyze when executing the Python script file using spark-submit. Unlike Scala and Java. Scala’s main argument, argv, actually accepts arguments from the command line. Python cannot do this. You can only use the sys module to receive command line arguments, sys.argv.

Argv is a list of types. When we get the parameter values passed in via sys.argv, be aware that it defaults to the python script file path to be executed after Spark-submit as the first argument, and subsequent arguments as the second. For example, run the following command:

./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"

Copy the code

Is:

  • argv[0]: /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py
  • argv[1]: files

Therefore, I need to get the files folder name, which SHOULD be argv[1].

Also, since argv is a list and has no size attribute, its length should be obtained by the len() method and is expected to be 2.

The pit in which integers participate in division

In Python 2.7, if you divide integers directly, the result is to remove decimals. So 4/5 is going to be 0. In Python 3, this operation is automatically converted to floating point.

The easiest way to solve this problem is to import a ready-made module:

from __future__ import division

Copy the code

Note: This import declaration should precede all import declarations.

Scala version

code

package bigdata.demo



import java.io.File

import java.text.SimpleDateFormat

import java.util.Calendar



import com.google.common.io.{Files => GoogleFiles}

import org.apache.commons.io.Charsets

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}



object Main {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("Binary Files").setMaster("local[*]")

val sc = new SparkContext(conf)



if (args.size ! = 1) {

Println (" Please enter the correct file or directory path ")

return

}



def analyseFileContent(filePath: String): RDD[String] = {

val data = sc.binaryRecords(filePath, 1)

val records = data.flatMap(x => x.flatMap(x => toBinaryStr(byteToShort(x)).toCharArray))

val mappedWithKey = records.map(i => if (i == '0') ('0', 1L) else ('1', 1L))

val result = mappedWithKey.reduceByKey(_ + _)



val sum = result.map(_._2).sum()

result.map { case (key, count) => formatOutput(key, count, sum)}

}



val path = args.head

val filePaths = fetchFiles(path)

filePaths.par.foreach { filePath =>

val outputs = analyseFileContent(filePath)

printOutputs(outputs)

saveOutputs(filePath, outputs)

}

}



private def byteToShort(b: Byte): Short =

if (b < 0) (b + 256).toShort else b.toShort



private def toBinaryStr(i: Short, digits: Int = 8): String =

String.format("%" + digits + "s", i.toBinaryString).replace(' ', '0')



private def printOutputs(outputs: RDD[String]): Unit = {

outputs.foreach(println)

}



private def saveOutputs(filePath: String, outputs: RDD[String]): Unit = {

val resultDir = new File("result")

if (! resultDir.exists()) resultDir.mkdir()



val resultFile = new File("result/" + getFileNameWithExtension(filePath) + ".output")

outputs.foreach(line => GoogleFiles.append(line + "\n", resultFile, Charsets.UTF_8))

Googlefiles.append (s" count in: ${formatLoggingTime()}\n\n", resultFile, charsets.utf_8)

}



private def formatLoggingTime(): String = {

val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

formatter.format(Calendar.getInstance().getTime)

}



private def getFileNameWithExtension(filePath: String): String = {

filePath.substring(filePath.lastIndexOf("/") + 1)

}



private def fetchFiles(path: String): List[String] = {

val fileOrDirectory = new File(path)

fileOrDirectory.isFile match {

case true => List(path)

case false => fileOrDirectory.listFiles().filter(_.isFile).map(_.getPath).toList

}

}



private def formatPercent(number: Double): String = {

Val percent = "%1.2f" format number * 100

s"${percent}%"

}



private def formatOutput(key: Char, count: Long, sum: Double): String = {

${formatPercent(count/sum)} ${formatPercent(count/sum)}"

}

}

Copy the code

run

After the code is compiled and packaged using SBT, the JAR file is generated. Then run in the Spark home directory:

$SPARK_HOME/bin/spark-submit --class bigdata.demo.Main --master spark://<ip> $SPARK_HOME/jars/binaryfilesstastistics_2. 11-1.0. The jar file:///share/spark-2.2.0-bin-hadoop2.7/derby.log

Copy the code

The last parameter “file:///share/spark-2.2.0-bin-hadoop2.7/derby.log” is the parameter received by main, that is, the file directory to analyze. If it is a local directory, you need to specify the file protocol ://. If it is a HDFS directory, you need to specify HDFS ://.

In the pit of

The value is of type byte

In Scala, the Byte type is an 8-bit signed complement integer. The value ranges from -128 to 127. If the binary value is 11111111, Byte data read through SparkContext’s binaryRecords() method will have the value -1 instead of 255. The reason is the complement. If the decimal value is 128, the value is -128 after it is converted to Byte.

For 1, if you execute toBinaryString (), then the string “11111111111111111111111111111111”, rather than we are looking forward to the “11111111”. As shown below:

For an eight-bit binary value, you can write a method that converts Byte to Short and then calls the toBinaryString() method to convert the corresponding binary string.

private def byteToShort(b: Byte): Short =

if (b < 0) (b + 256).toShort else b.toShort

Copy the code

For less than eight bits of binary value, the binary string will be less than eight bits if you call toBinaryString() directly. We can use String format to format:

private def toBinaryStr(i: Short, digits: Int = 8): String =

String.format("%" + digits + "s", i.toBinaryString).replace(' ', '0')

Copy the code

Of course, you can define these two methods as implicit methods of Byte and Short.

  • Author: Zhang Yi
  • Links to this article: Zhangyi. Xyz/read – binary…
  • Copyright Notice: All articles on this blog are licensed under a CC BY-NC-SA 3.0 license unless otherwise stated. Reprint please indicate the source!