[Guide] Processing .rpt files with Code Repositories: A Practical Approach

In this post, we’ll break down a Python script that processes .rpt files using PySpark. The code handles reading semi-structured data, converting it into structured formats, and outputting it to a specified dataset. Let’s dive in!

1. Imports and Configuration

The first step is setting up the necessary imports and configuring the transformation function:

from pyspark.sql import Row
from pyspark.sql import types as T
from transforms.api import Input, Output, transform

Transform configuration:

@transform(
    source=Input("RID1"),
    output=Output("RID2"),
)
def compute(ctx, source, output):

Feel free to specify a Spark Profile that better fits your needs.

2. Defining the Data Schema

We define a schema for the output DataFrame, ensuring structured and consistent data. Read everything as a string, worry about casting after.

schema = T.StructType([
    T.StructField("SourceFile", T.StringType(), True),
    T.StructField("Column1", T.StringType(), True),
    T.StructField("Column2", T.StringType(), True),
    T.StructField("Column3", T.StringType(), True),
    T.StructField("Column4", T.StringType(), True),
    T.StructField("Column5", T.StringType(), True),
    T.StructField("Column6", T.StringType(), True),
    ...
    T.StructField("ColumnN", T.StringType(), True),
])

This schema maps the fields from the .rpt file into named columns for easier manipulation and analysis.

3. Parsing Individual Rows

The function parse_separator encapsulates the logic for parsing separators providing the width of each row:

    def parse_separator(separator_string):
        parts = separator_string.split(" ")
        positions = list(map(len, parts))
        start_positions = [sum(positions[:i]) + i for i in range(len(parts))]
        segments = [
            [start_positions[i], start_positions[i] + len(parts[i]) + 1]
            for i, part in enumerate(parts)
            if "-" in part
        ]
        return segments

4. Processing Files

The process_file function reads and processes a single .rpt file, yielding structured rows:

        file_path = file_status.path
        file_name = file_path.split("/")[-1]

        with source.filesystem().open(file_path, "r", encoding="utf-8-sig") as f:
            content = f.read().replace("\ufeff", "").splitlines()
            headers = content[0]
            separators = content[1]
            data_lines = content[2:]
            segments = parse_separator(separators)
            parsed_headers = ["SourceFile"] + [
                headers[start:end].strip() for start, end in segments
            ]

            rows = []
            for line in data_lines:
                parsed_values = [file_name] + [
                    line[start:end].strip() for start, end in segments
                ]
                rows.append(Row(**dict(zip(parsed_headers, parsed_values))))
            return rows
  • The file_path.split(“/”)[-1] extracts the filename from the full path.
  • The raw row will be sliced into specific fields based on their character positions.

This function uses PySpark’s f.reader to read the file line by line using the first two lines for header and separators.

5. Reading and Writing Data

Finally, the code processes all .rpt files in the source dataset and writes the parsed data to the output:

files_rdd = source.filesystem().files("**/*.rpt").rdd
parsed_rdd = files_rdd.flatMap(process_file)

output.write_dataframe(parsed_rdd.toDF(schema))

Here:

  • files_rdd retrieves all .rpt files from the source directory.
  • flatMap applies process_file to each file, transforming the RDD into structured rows.
  • The RDD is converted into a DataFrame using the pre-defined schema and written to the output.

6. Why This Design?

The script emphasizes:

  • Modularity: Functions like parse_row and process_file break down the logic into manageable units.
  • Scalability: Leveraging PySpark ensures efficient processing of large datasets.
  • Readability: It’s simple so you don’t have to re-code in two months.

7. Input File

6 Likes