I would like to load some data in memory (e.g. while doing an API call) then process this data, then dump it to disk on the output dataset, and then repeat the operation - to keep the burden on the memory low.
Is it possible ?
2 Likes
A quick test with
import polars as pl
from transforms.api import (
LightweightOutput,
Output,
lightweight,
transform,
)
@lightweight(cpu_cores=1, memory_gb=2)
@transform(
output=Output("/path/to/test_multi_write"),
)
def test_multi_write(
output: LightweightOutput,
) -> None:
schema = {"id": pl.Int64, "chunk": pl.Int64, "value": pl.Utf8}
for chunk_num in range(1, 4):
df = pl.DataFrame(
{
"id": list(range((chunk_num - 1) * 10 + 1, chunk_num * 10 + 1)),
"chunk": [chunk_num] * 10,
"value": [f"chunk_{chunk_num}_row_{i}" for i in range(10)],
},
schema=schema,
)
output.write_table(df)
shows than indeed, the output is overwritten on each write_table.
However,
import polars as pl
from transforms.api import (
LightweightOutput,
Output,
lightweight,
transform,
)
@lightweight(cpu_cores=1, memory_gb=2)
@transform(
output=Output("/path/to/test_multi_write_files"),
)
def test_multi_write(
output: LightweightOutput,
) -> None:
schema = {"id": pl.Int64, "chunk": pl.Int64, "value": pl.Utf8}
for chunk_num in range(1, 4):
df = pl.DataFrame(
{
"id": list(range((chunk_num - 1) * 10 + 1, chunk_num * 10 + 1)),
"chunk": [chunk_num] * 10,
"value": [f"chunk_{chunk_num}_row_{i}" for i in range(10)],
},
schema=schema,
)
with output.filesystem().open(f"chunk_{chunk_num}.parquet", "wb") as f:
df.write_parquet(f)
Will indeed write multiple different files that don’t overwrite each other.
1 Like
And if you call put_metadata() on the output after the files are written you also get schema inference ![]()
1 Like
In case you have need to write multiple files you also can use LazyFrames, sink parquet with a partitionBy(Polars 1.37+) statement.
Some functions are flagged as unstable, so they may change in future.
import polars as pl
from transforms.api import (
LightweightOutput,
Output,
transform,
)
from typing import Iterator
@transform.using(
output=Output("/path/to/test_multi_write"),
).with_resources(
cpu_cores=1,
memory_gb=2
)
def test_multi_write(
output: LightweightOutput,
) -> None:
schema = {"id": pl.Int64, "chunk": pl.Int64, "value": pl.Utf8}
chunk_nums = range(1, 4)
def generator(chunk_num: int) -> Iterator[pl.DataFrame]:
df = pl.DataFrame(
{
"id": list(range((chunk_num - 1) * 10 + 1, chunk_num * 10 + 1)),
"chunk": [chunk_num] * 10,
"value": [f"chunk_{chunk_num}_row_{i}" for i in range(10)],
},
schema=schema,
)
yield df
lf: pl.LazyFrame = pl.concat(
(df.lazy() for cn in chunk_nums for df in generator(cn)),
how="vertical",
)
lf.sink_parquet(
pl.PartitionBy(
output.path_for_write_table,
key="chunk", # optional for this task
include_key=True, # optional for this task
# max_rows_per_file=500000,
approximate_bytes_per_file=100000000,
),
compression='zstd', # smaller than snappy
mkdir=True, # needed for a proper write in Foundry
engine="streaming", # reduce memory pressure
)
output.write_table(output.path_for_write_table)
Hints for Polars in Foundry:
- Use LazyFrames
- Use Enum Datatype if possible, it reduce memory pressure especially if used complex transforms or in nested Datatypes, like List or Structs
- In case your Dataset results in an Object Set enforce sort on your List Column, otherwise your index compute may explode.
- Set your compute engine to “streaming”.
1 Like