In this post, we’ll break down a Python script that processes .rpt
files using PySpark. The code handles reading semi-structured data, converting it into structured formats, and outputting it to a specified dataset. Let’s dive in!
1. Imports and Configuration
The first step is setting up the necessary imports and configuring the transformation function:
from pyspark.sql import Row
from pyspark.sql import types as T
from transforms.api import Input, Output, transform
Transform configuration:
@transform(
source=Input("RID1"),
output=Output("RID2"),
)
def compute(ctx, source, output):
Feel free to specify a Spark Profile that better fits your needs.
2. Defining the Data Schema
We define a schema for the output DataFrame, ensuring structured and consistent data. Read everything as a string, worry about casting after.
schema = T.StructType([
T.StructField("SourceFile", T.StringType(), True),
T.StructField("Column1", T.StringType(), True),
T.StructField("Column2", T.StringType(), True),
T.StructField("Column3", T.StringType(), True),
T.StructField("Column4", T.StringType(), True),
T.StructField("Column5", T.StringType(), True),
T.StructField("Column6", T.StringType(), True),
...
T.StructField("ColumnN", T.StringType(), True),
])
This schema maps the fields from the .rpt
file into named columns for easier manipulation and analysis.
3. Parsing Individual Rows
The function parse_separator encapsulates the logic for parsing separators providing the width of each row:
def parse_separator(separator_string):
parts = separator_string.split(" ")
positions = list(map(len, parts))
start_positions = [sum(positions[:i]) + i for i in range(len(parts))]
segments = [
[start_positions[i], start_positions[i] + len(parts[i]) + 1]
for i, part in enumerate(parts)
if "-" in part
]
return segments
4. Processing Files
The process_file function reads and processes a single .rpt file, yielding structured rows:
file_path = file_status.path
file_name = file_path.split("/")[-1]
with source.filesystem().open(file_path, "r", encoding="utf-8-sig") as f:
content = f.read().replace("\ufeff", "").splitlines()
headers = content[0]
separators = content[1]
data_lines = content[2:]
segments = parse_separator(separators)
parsed_headers = ["SourceFile"] + [
headers[start:end].strip() for start, end in segments
]
rows = []
for line in data_lines:
parsed_values = [file_name] + [
line[start:end].strip() for start, end in segments
]
rows.append(Row(**dict(zip(parsed_headers, parsed_values))))
return rows
- The file_path.split(“/”)[-1] extracts the filename from the full path.
- The raw row will be sliced into specific fields based on their character positions.
This function uses PySpark’s f.reader to read the file line by line using the first two lines for header and separators.
5. Reading and Writing Data
Finally, the code processes all .rpt files in the source dataset and writes the parsed data to the output:
files_rdd = source.filesystem().files("**/*.rpt").rdd
parsed_rdd = files_rdd.flatMap(process_file)
output.write_dataframe(parsed_rdd.toDF(schema))
Here:
- files_rdd retrieves all .rpt files from the source directory.
- flatMap applies process_file to each file, transforming the RDD into structured rows.
- The RDD is converted into a DataFrame using the pre-defined schema and written to the output.
6. Why This Design?
The script emphasizes:
- Modularity: Functions like
parse_row
andprocess_file
break down the logic into manageable units. - Scalability: Leveraging PySpark ensures efficient processing of large datasets.
- Readability: It’s simple so you don’t have to re-code in two months.