background

This section describes how to read Excel in Pandas and output the basic usage of Excel.

  • Specify sheet, specify desired column, rename column and specify type while reading
  • Use Spark to analyze and manipulate data (using pandas is acceptable for simple manipulation).
  • Output of single Sheet and multiple Sheet Excel files
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('JupyterPySpark').enableHiveSupport().getOrCreate()
Copy the code

Reading Excel

When an Excel file is read by Pandas

  • If you want to usecol = [2,6,7], you can usecol = [2,6,7]. Reportedly fixed in 0.23.0
  • You can rename the specified COL with the names parameter, which is a list of all field names
  • The COL type can be specified with the Converters parameter (or dType), which is a dictionary
# all fields
# cols = [' types', 'the head of the household number', 'household name', 'but the account type', 'but the account opening institutions',' branch full name to open an account institutions', b 'user name', 'but the accounts']
#names = ['type','code','name','acct_type','acct_org','org_name','acct_name','acct_code']

Select some fields
cols = ['Head name'.'Beneficiary's Account Name'.'Payee account number'] # pandas <= version 0.20 can be used this way
# cols=[2,6,7] # 0.21 and later versions are used this way
names = ['name'.'acct_name'.'acct_code']


pddf_meta = pd.read_excel('./sample.xlsx', sheet_name='Bank Account relationship', header=0, usecols = cols, names=names, converters={'Payee account number':str} ).dropna()
Copy the code

Use Spark to process data

  • Convert Pands DF to Spark DF
  • When constructing Spark DF, explicitly specify a schema to avoid type errors

Pandas DF to Spark DF

from pyspark.sql.types import *

# display the structure of the specified DF
schema = StructType().add('name', StringType()).add('acct_name', StringType()).add('acct_code', StringType())

df_meta = spark.createDataFrame(pddf_meta, schema)
df_meta.show(truncate=False)
Copy the code
+---------+---------+-------------------+ |name |acct_name|acct_code | +---------+---------+-------------------+ Universal center city shunxing snacks, xiao Ming | | | 6228450123084500000 | | chengdu Dan Lou little turrets small red | | 6222629123002790000 | | universal center city shunxing snacks xiaoming | 6228450123084500000 | | +---------+---------+-------------------+Copy the code
df_meta.registerTempTable('df_meta')
# Use Spark SQL to collect data
df_stats = spark.sql('select acct_code, acct_name, count(1) as cnt from df_meta group by acct_code, acct_name order by 1,2')
df_stats.show(5)
Copy the code
+-------------------+---------+---+ | acct_code|acct_name|cnt| +-------------------+---------+---+ |6222629123002790000| Xiao Ming little red | 1 | | 6228450123084500000 | | 2 | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- - + - +Copy the code

Union All operation

Spark’s unionAll() and union() are the same method and do not perform de-duplication (note the difference from SQL). Below, we intentionally construct a duplicate data.

Union () is officially recommended.

df_tmp_1 = df_stats
df_tmp_2 = df_stats.filter("Acct_name in (' xiaoming ')")
df_result = df_tmp_1.union(df_tmp_2)
df_result.show()
Copy the code
+-------------------+---------+---+ | acct_code|acct_name|cnt| +-------------------+---------+---+ |6222629123002790000| Xiao Ming little red | 1 | | 6228450123084500000 | | 2 | | 6228450123084500000 | xiaoming | 2 | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- - + - +Copy the code

Add the serial number

Depending on business requirements, you need to add a sequence number to each line in the output result. We use the row_number() function of SparkSQL and native Spark respectively, and there is no essential difference between the two methods.

SparkSQL way

df_result.registerTempTable('df_result')
df_result_1 = spark.sql("select row_number() over(order by r.cnt desc) as rn, r.* from df_result r")
df_result_1.show()
Copy the code
+---+-------------------+---------+---+ | rn| acct_code|acct_name|cnt| +---+-------------------+---------+---+ | 1 xiaoming | | 6228450123084500000 | 2 | 2 | | 6228450123084500000 | xiaoming | | 2 | 3 | 1 | | 6222629123002790000 | small red +---+-------------------+---------+---+Copy the code

Spark Function Mode

import pyspark.sql.functions as F
from pyspark.sql.window import Window

df_result_2 = df_result.withColumn('rn', 		F.row_number().over(Window.orderBy( F.col('cnt').desc() ) ) )\
        .select(['rn'.'acct_code'.'acct_name'.'cnt'])
df_result_2.show()    
Copy the code
+---+-------------------+---------+---+ | rn| acct_code|acct_name|cnt| +---+-------------------+---------+---+ | 1 xiaoming | | 6228450123084500000 | 2 | 2 | | 6228450123084500000 | xiaoming | | 2 | 3 | 1 | | 6222629123002790000 | small red +---+-------------------+---------+---+Copy the code

Use Pandas to export Excel

  • Convert to Pandas DF
  • Export Excel
# convert to Pandas DF and rename the field
pddf_meta_out = df_result_2.toPandas()

pddf_meta_out.columns = ['number'.'Payee account number'.'Beneficiary's Account Name'.'number']
Copy the code

Single Sheet

pddf_meta_out.to_excel('./sample_out_single.xlsx',sheet_name='output stats', index=False)
Copy the code

Multi Sheets

  • ExcelWriter can add multiple sheets
  • Use startrow to specify the location of the start Cell
writer = pd.ExcelWriter('./sample_out_multi.xlsx')
pddf_meta_out.to_excel(writer,sheet_name='output stats1', index=False)
pddf_meta_out.to_excel(writer,sheet_name='output stats2', index=False, startrow=1)
writer.save()
Copy the code

Check to see if the output file is generated

! pwd && ls -lh
Copy the code
/home total 60K-RW-RW-r -- 1 etL ETL 14K May 2 20:44 pandas_excel_in_out. Ipynb-rw-rw-r -- 1 ETL ETL 6.0K May 2 20:45 Xlsx-rw-rw-r -- 1 etl etl 5.5k May 2 20:45 sample_out_single. xlsx-RW-RW-r -- 1 etl etl 12K Apr 27 15:24  sample.xlsxCopy the code