Serverless Data Fetching stack with OpenFaaS & OpenFlow
The goal of this post is to build a Serverless app that can answer questions about data. This data will be fetched from a CSV file and formatted on-the-fly at runtime.
The Serverless part will be handled by OpenFaaS.
The data flow will be taken care of by OpenFlow. OpenFlow is a Python library which aims to ease the management of dataflows into applications. It uses pandas dataframes as its primary tool.
Data source
Install the dependencies
pip install openflow
To use OpenFlow, you need to define a fetch()
function. This function will fetch the data from the source of your choice. In this example, the source will be this CSV file containing a list of movies.
The
fetch()
function has to return a pandas.Dataframe instance.
from datetime import date
import pandas as pd
from openflow import DataSource
url = 'https://raw.githubusercontent.com/fivethirtyeight/data/master/bechdel/movies.csv'
fetch = lambda _: pd.read_csv(url)
More complex examples of
fetch()
function can be found here. They shows how to use Postgres and Mongo database as DataSource.
But for now, let's initialize a simple DataSource with the fetch()
function written above.
movies_datasource = DataSource(fetch)
print(movies_datasource.get_dataframe())
# year imdb ... period code decade code
# 0 2013 tt1711425 ... 1.0 1.0
# 1 2012 tt1343727 ... 1.0 1.0
# ... ... ... ... ... ...
# 1792 1971 tt0067992 ... NaN NaN
# 1793 1970 tt0065466 ... NaN NaN
Data fed by the DataSource seem correct.
Now we can compute new columns, also called outputs, which will be appended to the DataSource. For each output, we specify a name and a function. This function performs a transformation over the current DataFrame.
movies_datasource.add_output('percentage_of_max_budget', lambda df: df['budget'] / df['budget'].max())
movies_datasource.add_output('age', lambda df: date.today().year - df['year'])
# you can reuse previously defined outputs
movies_datasource.add_output('cat_age', lambda df: (df['age'] / 7).astype(int))
# we force the computation because `get_dataframe()` was already called once before
print(movies_datasource.get_dataframe(force_computation=True))
# year imdb ... percentage_of_max_budget age cat_age
# 0 2013 tt1711425 ... 0.030588 5 0
# 1 2012 tt1343727 ... 0.105882 6 0
# ... ... ... ... ... ... ...
# 1792 1971 tt0067992 ... 0.007059 47 6
# 1793 1970 tt0065466 ... 0.002353 48 6
Three new outputs have been added to the original DataSource.
Serverless
Now that our DataSource is ready, we can define an OpenFaas function.
$ faas new --lang python3-debian movie-age
$ printf "openflow\n" > movie-age/requirements.txt
Write the handler in movie-age/handler.py
.
import json
from datetime import date
import pandas as pd
from openflow import DataSource
url = 'https://raw.githubusercontent.com/fivethirtyeight/data/master/bechdel/movies.csv'
fetch = lambda _: pd.read_csv(url)
movies_datasource = DataSource(fetch)
movies_datasource.add_output('age', lambda df: date.today().year - df['year'])
df = movies_datasource.get_dataframe()
def handle(req):
json_req = json.loads(req)
imdb = json_req.get('imdb')
age = df[df['imdb'] == imdb]['age'].iloc[0]
print(json.dumps({ 'imdb': imdb, 'age': age }))
Deploy
$ faas build -f movie-age.yml && faas deploy -f movie-age.yml
Test
$ curl localhost:8080/function/movie-age -d '{ "imdb": "tt0067992" }'
# {"imdb": "tt0067992", "age": 47 }
Willy Wonka & the Chocolate Factory went out 47 years ago.
Thanks for reading me. In an upcoming post I will present a stack to do serverless machine learning. One endpoint for training, and one endpoint for prediction. It will certainly use Minio to store trained AI models as described here.