CSV Files instead of parquet?

Is there a way to store my data as CSV files in a Dataset instead of saving them in Parquet format?

For context, I would like to be able to make those CSVs available to another system, which would pull them from Foundry via the S3 API or export them via other means (including local download).

You can specify the output format in the write_dataframe() method

@transform(
  output=Output("path/dataset"),
  my_input=Input("path/dataset")
)
def my_compute_function(output, my_input):
    output.write_dataframe(my_input.dataframe(), output_format="csv")
1 Like

Once a schema has been applied to the dataset, you should be able to read these in Pipeline Builder the same way you can in Code Repositories.

To output to csv you can select the output format as shown in the screenshot below.

1 Like

follow up question - How can you make the column headers come through in the resulting CSV?

In Pipeline Builder, when you select CSV for the write format configuration, there is a toggle option to include the header row.

Is there a way to define the titles of the CSV files?

You can rename the CSV files in a transform, by using something like:

@transform(
    out=Output("..."),
    source_df=Input("..."),
)
def compute(source_df, out):
    # Get the filesystems handles
    fs_in = source_df.filesystem()
    fs_out = out.filesystem()
    # List files
    logging.warning(list(fs_in.ls()))

    # Get the current date
    # today_date = datetime.datetime.now().strftime("%Y-%m-%d")
    # Generate the file name with the current date
    # logging.warning(target_file_name)

    # Save the DataFrame as a CSV file
    for curr_file in fs_in.ls():
        # Figure out the date at which the file has been written
        write_ts = curr_file.modified
        logging.warning(curr_file.path)
        logging.warning(curr_file.modified)
        timestamp = write_ts / 1000  # Convert from milliseconds to seconds
        date = datetime.datetime.utcfromtimestamp(timestamp).date()
        logging.warning(date)
        target_file_name = f"my_report_{date}.csv"

        # List output files
        existing_output_files = [
            curr_file.path for curr_file in list(out.filesystem().ls())
        ]
        logging.warning(existing_output_files)

        counter = 0
        while target_file_name in existing_output_files:
            counter += 1
            target_file_name = f"my_report_{date}_{counter}.csv"

        with fs_in.open(curr_file.path, "rb") as f_in:
            with fs_out.open(target_file_name, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
                f_out.flush()  # shutil.copyfileobj does not flush
1 Like