Im trying to write a python transform logic that uses and API source from Data connection to download PDF from a website and save the PDF to Mediaset.
Here is my code
from transforms.api import transform, Input, Output
from transforms.external.systems import external_systems, Source
from transforms.mediasets import MediaSetOutput
import os
import logging
from pyspark.sql.functions import size, col
logger = logging.getLogger(__name__)
@external_systems(
nhtsawebsite_source=Source("ri.magritte..source.96de0262-bff5-474e-8f63-113eeca497f4")
)
@transform(
output_mediaset=MediaSetOutput("/AWS/AWS_Bootcamp/Bootcamp_resources/NHTSAandCARProject/RawData/DatafromNHTSA/TSBDocfromNHTSA"),
source_df=Input("ri.foundry.main.dataset.fc7d6f27-38ab-4bfd-8e38-9832b9ba5821"),
)
def compute(nhtsawebsite_source, source_df, output_mediaset):
# Correctly read the input dataframe
df = source_df.dataframe()
# Filter and select columns
df = df.filter(col("llmTSB_URLs").isNotNull() & (size(col("llmTSB_URLs")) > 0))
df = df.select("llmNHTSA_Campaign_ID", "llmTSB_URLs")
client = nhtsawebsite_source.get_https_connection().get_client()
nhtsabaseurl = nhtsawebsite_source.get_https_connection().url
rows = df.limit(5).collect() # Process only the first 5 rows
for row in rows:
campaign_id = row["llmNHTSA_Campaign_ID"]
tsb_urls = row["llmTSB_URLs"] # This is already a list, no need for eval()
filtered_urls = [url for url in tsb_urls if url.startswith(nhtsabaseurl)]
for url in filtered_urls:
try:
response = client.get(url)
response.raise_for_status()
pdf_filename = os.path.basename(url)
output_filename = f"{campaign_id}_{pdf_filename}"
# Use put_media_item to upload the PDF directly to the MediaSet
output_mediaset.put_media_item(response.content, output_filename)
logger.info(f"Uploaded PDF to MediaSet: {output_filename}")
except Exception as e:
logger.error(f"Error uploading PDF from {url} to MediaSet: {str(e)}")
logger.info("Finished uploading PDFs to MediaSet")
# Return the processed dataframe
return df
The executes, but I dont see files in the media set.
When I try to store the PDF’s to a dataset, it works fine.
Have you tried initializing the media set? In the project folder, if you make the media set, and then point to it using it’s RIID, do you still get the same issue?
Yes, The Media set has to be present before running the transform. Its documented
Unlike regular datasets, media sets must already exist before being used as an output. This can be done through the Foundry filesystem interface by navigating to the folder where you want the media set to exist.
@maddyAWS Can you share the code where you were saving the pdf file/s directly to a dataset, were you directly writing response.content into the dataset output?
Also, were you able to directly Write it to a MediaSet output?
I’m actually stuck on this as well, would appreciate your help!
the put_media_item function takes a file-like object as an input. So, something like this should work if you want to write directly to the Media Set from your external transform:
def compute(nhtsawebsite_source, source_df, output_mediaset):
# Correctly read the input dataframe
df = source_df.dataframe()
# Filter and select columns
df = df.filter(col("llmTSB_URLs").isNotNull() & (size(col("llmTSB_URLs")) > 0))
df = df.select("llmNHTSA_Campaign_ID", "llmTSB_URLs")
client = nhtsawebsite_source.get_https_connection().get_client()
nhtsabaseurl = nhtsawebsite_source.get_https_connection().url
rows = df.limit(5).collect() # Process only the first 5 rows
for row in rows:
campaign_id = row["llmNHTSA_Campaign_ID"]
tsb_urls = row["llmTSB_URLs"] # This is already a list, no need for eval()
filtered_urls = [url for url in tsb_urls if url.startswith(nhtsabaseurl)]
for url in filtered_urls:
try:
response = client.get(url)
response.raise_for_status()
pdf_filename = os.path.basename(url)
output_filename = f"{campaign_id}_{pdf_filename}"
with tempfile.NamedTemporaryFile() as tmp:
tmp.write(response.content)
tmp.flush()
with open(tmp.name, 'rb') as tmp_read:
# Use put_media_item to upload the PDF directly to the MediaSet
output_mediaset.put_media_item(tmp_read, path=output_filename)
logger.info(f"Uploaded PDF to MediaSet: {output_filename}")
except Exception as e:
logger.error(f"Error uploading PDF from {url} to MediaSet: {str(e)}")
logger.info("Finished uploading PDFs to MediaSet")