Pyetl is an ETL framework developed by pure Python. Compared with ETL tools such as SQOOP and Datax, Pyetl can add UDF functions to each field, making the data conversion process more flexible. Compared with professional ETL tools, Pyetl is lighter and operates by pure Python code. More in line with developer habits

The installation

pip3 install pyetl

Use the sample

Data synchronization between database tables

from pyetl import Task, DatabaseReader, DatabaseWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")
Task(reader, writer).start()
Copy the code

Database tables are synchronized to Hive tables

from pyetl import Task, DatabaseReader, HiveWriter2
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = HiveWriter2("hive://localhost:10000/default", table_name="target")
Task(reader, writer).start()
Copy the code

Database tables synchronize ES

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget")
Task(reader, writer).start()
Copy the code

The original table target table field names are different, and a field mapping needs to be added

Table_name = DatabaseReader("sqlite:// db.sqlite3", table_name="source") Name = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") # columns "uuid", "name": "full_name"} Task(reader, writer, columns=columns).start()Copy the code

Udf mapping of fields for field rule verification, data standardization, data cleaning, etc

Function ={"id": STR, "name": lambda x: x.strip()} Task(reader, writer, columns=columns, functions=functions).start()Copy the code

The inherited Task class extends ETL tasks flexibly

import json from pyetl import Task, DatabaseReader, DatabaseWriter class NewTask(Task): reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source") writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target") def get_columns(self): """ Generate field mapping configuration as a function, SQL = "SELECT columns from task where name='new_task'" columns = self.writer.db.read_one(sql)["columns"] return json.loads(columns) def get_functions(self): """ generate udF mapping of fields by function """ # The following example converts each field type to a string return {col: str for col in self.columns} def apply_function(self, record): """ record["flag"] = int(record["id"]) % 2 return record def before(self): """ Operations to be performed before a task starts, such as initializing the task table, SQL = "create table destination_table(id int, name varchar(100))" self.writer.db.execute(sql) def after(self): """ Actions to be performed after the task is complete, SQL = "update task set status='done' where name='new_task'" self.writer.db.execute(SQL) NewTask().start()Copy the code

Reader and Writer lists are currently implemented

Reader introduce
DatabaseReader Supports reading from all relational databases
FileReader Read structured text data, such as CSV files
ExcelReader Excel file reading
Writer introduce
DatabaseWriter Supports writes to all relational databases
ElasticSearchWriter Batch write data to es index
HiveWriter Insert Hive tables in batches
HiveWriter2 Importing Hive tables using Load Data (recommended)
FileWriter Writes data to a text file

conclusion

That’s it for this article on the Python ETL tool pyetl