Serverless distributed processing with BigFrames

Serverless distributed processing with BigFrames
Serverless distributed processing with BigFrames

BigFrames, a library Google Cloud has recently introduced, it is marking a significant advancement in data processing.

BigFrames is in Preview at the time this blog post is written

And, now let's look at this in more detail.

Current Challenges in Distributed Processing

In the current complex data landscape, we frequently deal with distributed workloads that involve large datasets. These tasks often include data enrichment, filtering, and executing intricate transformations adhering to advanced rules. The workloads usually combine SQL-based tasks with Python for more sophisticated transformations. The task isn't considered complete until the results can be shared with others, typically requiring exporting to Google Cloud Storage (GCS) or conveying the data to downstream software services through a messaging system like Cloud Pub/Sub, to complete the task at hand end to end.

BigFrames - What problem are we trying to solve
Click to watch on YouTube: what problem are we trying to solve

While these tasks can be executed using existing tools like DBT, BigQuery, or Airflow, they tend to complicate the processing workflow. The traditional method often involves using Airflow for orchestration and Cloud Dataflow for processing. This approach is complicated due to the steep learning curve of Apache Beam, which operates on Dataflow. Moreover, the deployment process in this setup is not straightforward, which requiring complex packaging and managing a range of dependencies.

What is BigFrames

My definition is that

It is an easy to learn, completely serverless distributed compute orchestration framework for batch processing

This library simplifies tasks traditionally handled by complex technologies like Apache Beam (on Dataflow) or Spark. It bridges the gap between local Pandas operations on Jupyter and deploying large-scale production workloads, enabling faster and more interactive development at scale.

BigFrames leverages BigQuery's serverless compute and uses Remote Functions on Cloud Run or Cloud Functions, providing a simpler, more modular method for managing tasks before embarking on machine learning workloads. This launch represents a major step forward in data processing, machine learning, distributed computing, and orchestration within the Google Cloud ecosystem.

A real world BigFrames use case

Consider the following flow

Click to watch on YouTube: How does it work?
💡
Now consider the steps involved in doing this, and how many services you may need to deploy independently using a more conservative approach?
  1. Data models are provided in either a single or multiple BigQuery tables.
  2. Execution of SQL-based transformations is necessary.
  3. Additionally, more intricate transformations are required, calling for Python usage, often along with extra dependencies.
  4. Encryption of specific data columns, particularly those containing Personally Identifiable Information (PII), is required.
  5. The processed data must be dispatched to one or two destinations: a GCS bucket and/or a Cloud Pub/Sub message queue.

And now let's break it down into how exactly it works

Click to watch on YouTube: Why should we care?

So that,

Execution of SQL-based transformations by using the Pandas library but delegating the execution to BigQuery, this means scaling is no longer an issue.

df["day_of_week"] = df["day_of_week"].map(
    {
        2: "Monday", 3: "Tuesday", 4: "Wednesday", 5: "Thursday", 6: "Friday",
        7: "Saturday", 1: "Sunday"
    }
)

More complex transformations are done using annotated Python functions, again via the Pandas library, with execution delegated to Remote functions.

@bpd.remote_function(
    [int],
    str,
    bigquery_connection="bigframes-rf-conn",
    reuse=True,
    packages=[],
)
def get_mapped_duration(x):
    if x < 120:
        return 'under 2 hours'
    elif x < 240:
        return '2-4 hours'
    elif x < 360:
        return '4-6 hours'
    elif x < 480:
        return '6-8 hours'
    elif 480 <= x < 1440:
        return '8-24 hours'
    else:
        return '> 24 hours'

Encryption requires additional dependencies to be installed, managed by annotated python functions, with execution delegated to Remote functions.

@bpd.remote_function(
    [str],
    str,
    bigquery_connection="bigframes-rf-conn",
    reuse=True,
    packages=["cryptography"],
)
def get_encrypted(x):
    from cryptography.fernet import Fernet

    # handle missing value
    if x is None:
        x = ""
    else:
        x = str(x)

    key = Fernet.generate_key()
    f = Fernet(key)
    return f.encrypt(x.encode()).decode()

Finally, transformed data can be directly saved into a GCS bucket or sent as events to Cloud Pub/Sub all in the same place by extracting data out of the Pandas dataframe.

The best part, is that all the steps mentioned here, only requires a single Cloud Run Job to be deployed to manage all the creation of resources and execution.

The whole example source code can also be found on Github with a Demo and also code explained if you are interested.

BigFrames pros and cons

OK, let's summarise the pros and cons, with some ideas as food for thought. And for a more detailed explanation of these points, it would be easier to head to the last section of the video.

👍
Pros
  • Orchestrate complex transformation using BigQuery and Remote Functions
  • End to end serverless stack
  • Dynamically creation of Remote Functions, faster development cycle
👎
Cons
  • Cannot configure dynamically created Remote Functions
  • Does not have an easy way to maintain dynamically created Remote Functions
  • Ingress rule is public for dynamically created Remote Functions
💡
Ideas
  • Interactive development at scale in Jupyter Notebooks
  • Swap Pandas to BigFrames at ease
  • .to_pandas() for lightweight processing
  • Connects to ML workload

#BigFrames #RemoteFunctions #DistributedCompute