I have an object type with an attachment property, and I need to migrate this object type to use media sets, but I don’t want to lose my existing attachments. How can I convert the attachments to be a media set?
First, you need to pull the metadata from the attachments. This is done by hitting the public API via external transforms.
from pyspark.sql import functions as F
from pyspark.sql import types as T
from transforms.api import transform, Input, Output, incremental
from transforms.external.systems import EgressPolicy, use_external_systems, Credential, ExportControl
import json
import requests
import base64
STACK = {STACK URL}
ACCEPTED_FILE_FORMATS = "application/pdf"
@incremental(semantic_version=8)
@use_external_systems(
egress=EgressPolicy('ri.resource-policy-manager.global.network-egress-policy.19dc88e7-88dd-4574-bf49-93983e07cf2e'),
export_control=ExportControl(markings=['90dce2eb-25c6-45a8-baf7-a52eb94d74a5']),
creds=Credential('ri.credential..credential.9911be19-c610-47b6-b6a1-b19db2c4e73d')
)
@transform(
attachment_metadata_agg=Output("ri.foundry.main.dataset.dbfe9f70-64e8-49cf-b751-bbdd4721aebd"),
attachments_to_process=Input("ri.foundry.main.dataset.36bc2358-ef50-4103-a0e0-06fadd77402b"),
)
def compute(ctx, attachments_to_process, attachment_metadata_agg, egress, creds, export_control):
token = creds.get('key')
headers = {"Authorization": f"Bearer {token}"}
@F.udf(T.StringType())
def collect_attachment_metadata(r):
attachment_metadata = f"https://{STACK}/api/v1/attachments/{r}"
response = requests.get(attachment_metadata, headers=headers)
return json.dumps(response.json())
json_schema = T.StructType([
T.StructField("rid", T.StringType()),
T.StructField("filename", T.StringType()),
T.StructField("sizeBytes", T.StringType()),
T.StructField("mediaType", T.StringType()),
])
df = attachments_to_process.dataframe().withColumn('metadata', collect_attachment_metadata(F.col('attachment_rid')))
flattened_metadata = (
df.select(
F.from_json(F.col("metadata"), json_schema)
.alias("parsed_metadata")
)
.select("parsed_metadata.*")
)
result = (
flattened_metadata
.join(df, on=flattened_metadata.rid == df.attachment_rid)
.drop("metadata", "attachment_rid")
)
acceptable_files = result.withColumn("file_acceptance",
F.when(
(F.col("mediaType") == ACCEPTED_FILE_FORMATS), True)
.otherwise(False)
)
attachment_metadata_agg.write_dataframe(acceptable_files)
Second, upload attachment data into the mediaset using the media set upload public api endpoint in another external transform.
from pyspark.sql import functions as F
from pyspark.sql import types as T
from transforms.api import transform, Input, Output, incremental
from transforms.external.systems import EgressPolicy, use_external_systems, Credential, ExportControl
from transforms.mediasets import MediaSetOutput
import requests
import time
STACK = {STACK URL}
@incremental(v2_semantics=True)
@use_external_systems(
egress=EgressPolicy('ri.resource-policy-manager.global.network-egress-policy.19dc88e7-88dd-4574-bf49-93983e07cf2e'),
export_control=ExportControl(markings=['90dce2eb-25c6-45a8-baf7-a52eb94d74a5']),
creds=Credential('ri.credential..credential.9911be19-c610-47b6-b6a1-b19db2c4e73d')
)
@transform(
attachment_content=MediaSetOutput("ri.mio.main.media-set.6b4fb9d5-c9fb-4962-a543-18561c78e9ac", should_snapshot=True),
logging=Output("ri.foundry.main.dataset.cb6f9875-09ab-4cea-a8b7-9f7cb5f32a1f"),
attachments_to_process=Input("ri.foundry.main.dataset.dbfe9f70-64e8-49cf-b751-bbdd4721aebd")
)
def compute(ctx, egress, creds, export_control, attachment_content, attachments_to_process, logging):
token = creds.get('key')
headers = {"Authorization": f"Bearer {token}"}
UDF_RETURN = T.StructType([
T.StructField('status', T.StringType()),
T.StructField('msg', T.StringType())
])
@F.udf(UDF_RETURN)
def collect_attachment_content(r, fname):
try:
attachment_metadata = f"https://{STACK}/api/v1/attachments/{r}/content"
response = requests.get(attachment_metadata, headers=headers, stream=True)
response.raw.decode_content = True # need to set flag to true to avoid gzip from stream
resp = attachment_content.put_media_item(response.raw, fname)
return "SUCCESS", resp.media_item_rid
except Exception as e:
return "FAILED", f"{e}"
df = attachments_to_process.dataframe()
df = df.filter(F.col("file_acceptance"))
df = df.select("rid", "filename", "file_acceptance")
df = df.withColumn("filename", F.regexp_replace("filename", "\\s+", ""))
df = df.withColumn('output', collect_attachment_content(F.col('rid'), F.col('filename')))
df = df.withColumn('status', F.col("output.status"))
df = df.withColumn('msg', F.col("output.msg"))
df = df.drop("output")
logging.write_dataframe(df)