We are currently facing a challenge with processing and integrating a large volume of Excel files, each exhibiting a unique schema and representing a pivoted format of the underlying data. The goal is to ingest, infer schemas, clean, melt (unpivot), and ultimately perform a UNION operation on all files.
For a singular dataset, our approach typically involves:
- Extracting data from the file.
- Ingesting the data into a dataset with an identical name to the original file.
- Assigning a STRING data type to all columns to ensure schema compatibility.
- Executing transformations to correct data types and column names.
- Unpivoting the dataset.
Could you provide guidance or best practices on scaling this process to handle numerous distinct files, each requiring individual ingestion and schema inference, before combining them into a unified dataset?
The process of ingesting and processing a large volume of distinct files should be already streamlined.
1. File Ingestion:
Ingest all the files in one dataset (not the main topic here)
2. Schema Inference:
We opted for direct raw file access and manually setting the columns types to STRING.
from transforms.api import Pipeline, transform, Input, Output
from myproject import datasets
from pyspark.sql import Row
from openpyxl import load_workbook
import tempfile
import shutil
import re
import logging
log = logging.getLogger(__name__)
my_pipeline = Pipeline()
@transform(
the_output=Output(output_path),
the_input=Input(input_path),
)
def my_compute_function(the_input, the_output):
def process_file(file_status):
with tempfile.NamedTemporaryFile() as tmp:
with the_input.filesystem().open(file_status.path, 'rb') as f:
shutil.copyfileobj(f, tmp)
tmp.flush() # shutil.copyfileobj does not flush
with open(tmp.name) as t:
log.info("Processing:" + str(file_status.path))
ws = load_workbook(t)['Sheet1']
# Workbook rows are indexed at 1
column_list = [cell.value for cell in ws[1]
column_list = [re.sub(r'[ ,;{}()\n\t=]', '_', x) for x in column_list]
log.info("Column list : {0}".format(column_list))
MyRow = Row(*column_list)
# Fix string encoding issues
for row_values in [[cell.value.encode("utf-8", "replace") if cell.data_type == 's' else str(cell.value) for cell in row] for row in ws.rows][2:]:
yield MyRow(*row_values)
files_df = the_input.filesystem().files('**/*.xlsx')
processed_df = files_df.rdd.flatMap(process_file).toDF()
the_output.write_dataframe(processed_df)
return my_compute_function
Further cleaning and parsing can happen downstream