Serverless Data Fetching stack with OpenFaaS & OpenFlow

The goal of this post is to build a Serverless app that can answer questions about data. This data will be fetched from a CSV file and formatted on-the-fly at runtime.
The Serverless part will be handled by OpenFaaS.
The data flow will be taken care of by OpenFlow. OpenFlow is a Python library which aims to ease the management of dataflows into applications. It uses pandas dataframes as its primary tool.


Data source

Install the dependencies

pip install openflow

To use OpenFlow, you need to define a fetch() function. This function will fetch the data from the source of your choice. In this example, the source will be this CSV file containing a list of movies.

The fetch() function has to return a pandas.Dataframe instance.

from datetime import date

import pandas as pd
from openflow import DataSource

url = 'https://raw.githubusercontent.com/fivethirtyeight/data/master/bechdel/movies.csv'
fetch = lambda _: pd.read_csv(url)

More complex examples of fetch() function can be found here. They shows how to use Postgres and Mongo database as DataSource.

But for now, let's initialize a simple DataSource with the fetch() function written above.

movies_datasource = DataSource(fetch)
print(movies_datasource.get_dataframe())
#       year       imdb     ...     period code decade code
# 0     2013  tt1711425     ...             1.0         1.0
# 1     2012  tt1343727     ...             1.0         1.0
# ...    ...        ...     ...             ...         ...
# 1792  1971  tt0067992     ...             NaN         NaN
# 1793  1970  tt0065466     ...             NaN         NaN

Data fed by the DataSource seem correct.

Now we can compute new columns, also called outputs, which will be appended to the DataSource. For each output, we specify a name and a function. This function performs a transformation over the current DataFrame.

movies_datasource.add_output('percentage_of_max_budget', lambda df: df['budget'] / df['budget'].max())
movies_datasource.add_output('age', lambda df: date.today().year - df['year'])

# you can reuse previously defined outputs
movies_datasource.add_output('cat_age', lambda df: (df['age'] / 7).astype(int))

# we force the computation because `get_dataframe()` was already called once before
print(movies_datasource.get_dataframe(force_computation=True))

#       year       imdb     ...     percentage_of_max_budget  age  cat_age
# 0     2013  tt1711425     ...                     0.030588    5        0
# 1     2012  tt1343727     ...                     0.105882    6        0
# ...    ...        ...     ...                          ...  ...      ...
# 1792  1971  tt0067992     ...                     0.007059   47        6
# 1793  1970  tt0065466     ...                     0.002353   48        6

Three new outputs have been added to the original DataSource.


Serverless

Now that our DataSource is ready, we can define an OpenFaas function.

$ faas new --lang python3-debian movie-age
$ printf "openflow\n" > movie-age/requirements.txt

Write the handler in movie-age/handler.py.

import json
from datetime import date

import pandas as pd
from openflow import DataSource

url = 'https://raw.githubusercontent.com/fivethirtyeight/data/master/bechdel/movies.csv'
fetch = lambda _: pd.read_csv(url)
movies_datasource = DataSource(fetch)
movies_datasource.add_output('age', lambda df: date.today().year - df['year'])
df = movies_datasource.get_dataframe()

def handle(req):
    json_req = json.loads(req)
    imdb = json_req.get('imdb')
    age = df[df['imdb'] == imdb]['age'].iloc[0]
    print(json.dumps({ 'imdb': imdb, 'age': age }))

Deploy

$ faas build -f movie-age.yml && faas deploy -f movie-age.yml

Test

$ curl localhost:8080/function/movie-age -d '{ "imdb": "tt0067992" }'
# {"imdb": "tt0067992", "age": 47 }

Willy Wonka & the Chocolate Factory went out 47 years ago.


Thanks for reading me. In an upcoming post I will present a stack to do serverless machine learning. One endpoint for training, and one endpoint for prediction. It will certainly use Minio to store trained AI models as described here.