How do I convert an attachment to a media set?

I have an object type with an attachment property, and I need to migrate this object type to use media sets, but I don’t want to lose my existing attachments. How can I convert the attachments to be a media set?

First, you need to pull the metadata from the attachments. This is done by hitting the public API via external transforms.

from pyspark.sql import functions as F
from pyspark.sql import types as T
from transforms.api import transform, Input, Output, incremental
from transforms.external.systems import EgressPolicy, use_external_systems, Credential, ExportControl
import json
import requests
import base64

STACK = {STACK URL}
ACCEPTED_FILE_FORMATS = "application/pdf"


@incremental(semantic_version=8)
@use_external_systems(
    egress=EgressPolicy('ri.resource-policy-manager.global.network-egress-policy.19dc88e7-88dd-4574-bf49-93983e07cf2e'),
    export_control=ExportControl(markings=['90dce2eb-25c6-45a8-baf7-a52eb94d74a5']),
    creds=Credential('ri.credential..credential.9911be19-c610-47b6-b6a1-b19db2c4e73d')
)
@transform(
    attachment_metadata_agg=Output("ri.foundry.main.dataset.dbfe9f70-64e8-49cf-b751-bbdd4721aebd"),
    attachments_to_process=Input("ri.foundry.main.dataset.36bc2358-ef50-4103-a0e0-06fadd77402b"),
)
def compute(ctx, attachments_to_process, attachment_metadata_agg, egress, creds, export_control):
    token = creds.get('key')
    headers = {"Authorization": f"Bearer {token}"}

    @F.udf(T.StringType())
    def collect_attachment_metadata(r):
        attachment_metadata = f"https://{STACK}/api/v1/attachments/{r}"
        response = requests.get(attachment_metadata, headers=headers)
        return json.dumps(response.json())

    json_schema = T.StructType([
        T.StructField("rid", T.StringType()),
        T.StructField("filename", T.StringType()),
        T.StructField("sizeBytes", T.StringType()),
        T.StructField("mediaType", T.StringType()),
    ])

    df = attachments_to_process.dataframe().withColumn('metadata', collect_attachment_metadata(F.col('attachment_rid')))
    flattened_metadata = (
        df.select(
            F.from_json(F.col("metadata"), json_schema)
            .alias("parsed_metadata")
        )
        .select("parsed_metadata.*")
    )
    result = (
        flattened_metadata
        .join(df, on=flattened_metadata.rid == df.attachment_rid)
        .drop("metadata", "attachment_rid")
    )

    acceptable_files = result.withColumn("file_acceptance",
        F.when( 
            (F.col("mediaType") == ACCEPTED_FILE_FORMATS), True)
        .otherwise(False)
    )
    attachment_metadata_agg.write_dataframe(acceptable_files)

Second, upload attachment data into the mediaset using the media set upload public api endpoint in another external transform.

from pyspark.sql import functions as F
from pyspark.sql import types as T
from transforms.api import transform, Input, Output, incremental
from transforms.external.systems import EgressPolicy, use_external_systems, Credential, ExportControl
from transforms.mediasets import MediaSetOutput
import requests
import time

STACK = {STACK URL}

@incremental(v2_semantics=True)
@use_external_systems(
    egress=EgressPolicy('ri.resource-policy-manager.global.network-egress-policy.19dc88e7-88dd-4574-bf49-93983e07cf2e'),
    export_control=ExportControl(markings=['90dce2eb-25c6-45a8-baf7-a52eb94d74a5']),
    creds=Credential('ri.credential..credential.9911be19-c610-47b6-b6a1-b19db2c4e73d')
)
@transform(
    attachment_content=MediaSetOutput("ri.mio.main.media-set.6b4fb9d5-c9fb-4962-a543-18561c78e9ac", should_snapshot=True),
    logging=Output("ri.foundry.main.dataset.cb6f9875-09ab-4cea-a8b7-9f7cb5f32a1f"),
    attachments_to_process=Input("ri.foundry.main.dataset.dbfe9f70-64e8-49cf-b751-bbdd4721aebd")
)
def compute(ctx, egress, creds, export_control, attachment_content, attachments_to_process, logging):
    token = creds.get('key')
    headers = {"Authorization": f"Bearer {token}"}

    UDF_RETURN = T.StructType([
        T.StructField('status', T.StringType()),
        T.StructField('msg', T.StringType())
    ])

    @F.udf(UDF_RETURN)
    def collect_attachment_content(r, fname):
        try:
            attachment_metadata = f"https://{STACK}/api/v1/attachments/{r}/content"
            response = requests.get(attachment_metadata, headers=headers, stream=True)
            response.raw.decode_content = True  # need to set flag to true to avoid gzip from stream
            resp = attachment_content.put_media_item(response.raw, fname)
            return "SUCCESS", resp.media_item_rid
        except Exception as e:
            return "FAILED", f"{e}"

    df = attachments_to_process.dataframe()
    df = df.filter(F.col("file_acceptance"))
    df = df.select("rid", "filename", "file_acceptance")
    df = df.withColumn("filename", F.regexp_replace("filename", "\\s+", ""))
    df = df.withColumn('output', collect_attachment_content(F.col('rid'), F.col('filename')))
    df = df.withColumn('status', F.col("output.status"))
    df = df.withColumn('msg', F.col("output.msg"))
    df = df.drop("output")

    logging.write_dataframe(df)