I would like to load some data in memory (e.g. while doing an API call) then process this data, then dump it to disk on the output dataset, and then repeat the operation - to keep the burden on the memory low.
Is it possible ?
2 Likes
A quick test with
import polars as pl
from transforms.api import (
LightweightOutput,
Output,
lightweight,
transform,
)
@lightweight(cpu_cores=1, memory_gb=2)
@transform(
output=Output("/path/to/test_multi_write"),
)
def test_multi_write(
output: LightweightOutput,
) -> None:
schema = {"id": pl.Int64, "chunk": pl.Int64, "value": pl.Utf8}
for chunk_num in range(1, 4):
df = pl.DataFrame(
{
"id": list(range((chunk_num - 1) * 10 + 1, chunk_num * 10 + 1)),
"chunk": [chunk_num] * 10,
"value": [f"chunk_{chunk_num}_row_{i}" for i in range(10)],
},
schema=schema,
)
output.write_table(df)
shows than indeed, the output is overwritten on each write_table.
However,
import polars as pl
from transforms.api import (
LightweightOutput,
Output,
lightweight,
transform,
)
@lightweight(cpu_cores=1, memory_gb=2)
@transform(
output=Output("/path/to/test_multi_write_files"),
)
def test_multi_write(
output: LightweightOutput,
) -> None:
schema = {"id": pl.Int64, "chunk": pl.Int64, "value": pl.Utf8}
for chunk_num in range(1, 4):
df = pl.DataFrame(
{
"id": list(range((chunk_num - 1) * 10 + 1, chunk_num * 10 + 1)),
"chunk": [chunk_num] * 10,
"value": [f"chunk_{chunk_num}_row_{i}" for i in range(10)],
},
schema=schema,
)
with output.filesystem().open(f"chunk_{chunk_num}.parquet", "wb") as f:
df.write_parquet(f)
Will indeed write multiple different files that don’t overwrite each other.
1 Like
And if you call put_metadata() on the output after the files are written you also get schema inference ![]()
1 Like