I am synchronizing csv files from S3.
Although the file is delimited by “,”, there are rare cases where “,” is included in the data value itself.
Currently, I am checking manually by following the steps below.
Target Dataset > Edit Schema > Save and validate
Is it possible to automatically detect schema errors?
Hi,
If you want to automatically detect those errors, you’ll need to keep your ingest as raw files, and then do a transform downstream that reads the files one by one and checks for extra commas.
For example, if you’re expecting 5 commas per row:
from transforms.api import transform, Input, Output
@transform(
output=Output("output_dataset"),
input=Input("input_dataset")
)
def compute(ctx, input, output):
# Get the list of files in the input dataset
input_files = input.filesystem().list_files()
# Open the output file for writing
with output.filesystem().open('cleaned_data.csv', 'w') as output_file:
for file_info in input_files:
# Open each input file
with input.filesystem().open(file_info.path, 'r') as input_file:
for line in input_file:
# Count the number of commas in the line
if line.count(',') == 5:
# Write the line to the output if it has exactly 5 commas
output_file.write(line)
If your files are large and you need efficiency, you could also // the work by reading them into a dataframe and leverage spark, something like:
spark = ctx.spark_session
# Read the CSV files into a DataFrame
df = spark.read.option("header", True).csv(input.filesystem().hadoop_path)
# Define a UDF to count commas in a row
count_commas = F.udf(lambda row: row.count(','), 'int')
# Filter rows with exactly 5 commas
filtered_df = df.filter(count_commas(F.concat_ws(',', *df.columns)) == 5)