View this notebook on GitHub or run it yourself on Binder!
Integrate with Prefect#
rubicon_ml
offers an integration with Prefect, an open source workflow management engine used heavily within the Python ecosystem. In Prefect, a unit of work is called a task, and a collection of tasks makes up a flow. A flow represents your workflow pipeline. You can integrate rubicon_ml
into your workflows to persist metadata about your experimentation.
We’ll run a Prefect server locally for this example. If you already have Prefect and Docker installed, you can start a Prefect server and agent with these commands:
prefect backend server
prefect server start
# and in a new terminal
prefect agent start
For more context, check out the workflow README on GitHub.
Setting up a simple flow#
Now we can get started! Creating Prefect tasks is easy enough on its own, but we’ve added some simple ones to the rubicon_ml
library.
[1]:
from rubicon_ml.workflow.prefect import (
get_or_create_project_task,
create_experiment_task,
log_artifact_task,
log_dataframe_task,
log_feature_task,
log_metric_task,
log_parameter_task,
)
We’ll need a workflow to integrate rubicon_ml
logging into, so let’s put together a simple one. To mimic a realistic example, we’ll create tasks for loading data, splitting said data, extracting features, and training a model.
[2]:
from prefect import task
@task
def load_data():
from sklearn.datasets import load_wine
return load_wine()
[3]:
@task
def split_data(dataset):
from sklearn.model_selection import train_test_split
return train_test_split(
dataset.data,
dataset.target,
test_size=0.25,
)
[4]:
@task
def get_feature_names(dataset):
return dataset.feature_names
[5]:
@task
def fit_pred_model(
train_test_split_result,
n_components,
n_neighbors,
is_standardized
):
from sklearn import metrics
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
X_train, X_test, y_train, y_test = train_test_split_result
if is_standardized:
classifier = make_pipeline(
StandardScaler(),
PCA(n_components=n_components),
KNeighborsClassifier(n_neighbors=n_neighbors),
)
else:
classifier = make_pipeline(
PCA(n_components=n_components),
KNeighborsClassifier(n_neighbors=n_neighbors),
)
classifier.fit(X_train, y_train)
pred_test = classifier.predict(X_test)
accuracy = metrics.accuracy_score(y_test, pred_test)
return accuracy
Without rubicon_ml
, here’s what this simple flow would look like:
[6]:
from prefect import Flow
n_components = 2
n_neighbors = 5
is_standardized = True
with Flow("Wine Classification") as flow:
wine_dataset = load_data()
feature_names = get_feature_names(wine_dataset)
train_test_split = split_data(wine_dataset)
accuracy = fit_pred_model(
train_test_split,
n_components,
n_neighbors,
is_standardized,
)
Running a flow and viewing results#
Now we’ll register our flow with the local server. First, we’ll have to use the prefect
CLI one more time to create a project.
prefect create project 'Wine Classification'
The URL printed from the call to flow.register
opens the local Prefect UI. We can use it to run and monitor our flows.
[7]:
flow_id = flow.register(project_name="Wine Classification")
Flow URL: http://localhost:8080/default/flow/aef9fb65-c49b-4c46-89be-e31c357a165e
└── ID: 531ffe8c-fb2b-4855-b5d8-f9bcef51ad2d
└── Project: Wine Classification
└── Labels: ['8c859084633f']
We’ll also put together a function to run our flows and wait for them to finish.
[8]:
import time
from prefect import Client
prefect_client = Client()
def run_flow(client, flow_id):
flow_run_id = client.create_flow_run(flow_id=flow_id)
is_finished = False
while not is_finished:
state = client.get_flow_run_info(flow_run_id).state
print(f"{state.message.strip('.')}. Waiting...")
time.sleep(3)
is_finished = state.is_finished()
assert state.is_successful()
print(f"Flow run {flow_run_id} was successful!")
return flow_run_id
flow_run_id = run_flow(prefect_client, flow_id)
Flow run scheduled. Waiting...
Flow run scheduled. Waiting...
All reference tasks succeeded. Waiting...
Flow run 024457a5-aac4-449a-a5a3-7b3986f90cba was successful!
We assigned a few variables in our flow, most notably the result of fit_pred_model
, accuracy
. This accuracy metric is how we’ll determine whether or not the model we trained is a success. However, retrieving state variables from flows is a bit cumbersome.
[9]:
info = prefect_client.get_flow_run_info(flow_run_id)
slugs = [t.task_slug for t in info.task_runs]
index = slugs.index(accuracy.slug)
result = info.task_runs[index].state._result.read(
info.task_runs[index].state._result.location,
)
result.value
[9]:
0.9555555555555556
What’s going on here isn’t exactly intuitive, and all that only extracted one piece of data from one task.
Adding Rubicon to your flow#
We can leverage the Prefect tasks within the rubicon_ml
library to log all the info we want about our model run. Then, we can use the standard rubicon_ml
logging library to retrieve and inspect our metrics and other logged data. This is much simpler than digging through the state of each executed task and extracting its results.
Here’s the same flow from above, this time with rubicon_ml
tasks integrated.
[10]:
import os
from prefect import unmapped
root_dir = os.environ.get("RUBICON_ROOT", "rubicon-root")
root_path = f"{os.path.dirname(os.getcwd())}/{root_dir}"
n_components = 2
n_neighbors = 5
is_standardized = True
with Flow("Wine Classification with Rubicon") as flow:
project = get_or_create_project_task(
"filesystem",
root_path,
"Wine Classification with Prefect",
)
experiment = create_experiment_task(
project,
name="logged from a Prefect task",
)
wine_dataset = load_data()
feature_names = get_feature_names(wine_dataset)
train_test_split = split_data(wine_dataset)
log_feature_task.map(unmapped(experiment), feature_names)
log_parameter_task(experiment, "n_components", n_components)
log_parameter_task(experiment, "n_neighbors", n_neighbors)
log_parameter_task(experiment, "is_standardized", is_standardized)
accuracy = fit_pred_model(
train_test_split,
n_components,
n_neighbors,
is_standardized,
)
log_metric_task(experiment, "accuracy", accuracy)
Again, we’ll register and run the flow.
[11]:
flow_with_rubicon_id = flow.register(project_name="Wine Classification")
flow_run_with_rubicon_id = run_flow(prefect_client, flow_with_rubicon_id)
Flow URL: http://localhost:8080/default/flow/9b91bafc-7e8b-4361-b1d0-bc57dac4cebc
└── ID: 081b9522-5db8-4ce6-ae64-ea8afd5b323e
└── Project: Wine Classification
└── Labels: ['8c859084633f']
Flow run scheduled. Waiting...
Submitted for execution. Waiting...
Running flow. Waiting...
All reference tasks succeeded. Waiting...
Flow run 29cef175-c115-4da2-a9f7-40f79bdb6104 was successful!
This time we can use rubicon_ml
to inspect our accuracy, among other things!
[12]:
from rubicon_ml import Rubicon
rubicon = Rubicon(persistence="filesystem", root_dir=root_path)
project = rubicon.get_project("Wine Classification with Prefect")
experiment = project.experiments()[0]
features = [f.name for f in experiment.features()]
parameters = [(p.name, p.value) for p in experiment.parameters()]
metrics = [(m.name, m.value) for m in experiment.metrics()]
print(
f"experiment {experiment.id}\n"
f"features: {features}\nparameters: {parameters}\n"
f"metrics: {metrics}"
)
experiment 6b1c3846-5aab-4bda-8c16-15b279d8616c
features: ['alcalinity_of_ash', 'alcohol', 'ash', 'color_intensity', 'flavanoids', 'hue', 'magnesium', 'malic_acid', 'nonflavanoid_phenols', 'od280/od315_of_diluted_wines', 'proanthocyanins', 'proline', 'total_phenols']
parameters: [('is_standardized', True), ('n_components', 2), ('n_neighbors', 5)]
metrics: [('accuracy', 1.0)]
Extracting data from rubicon_ml
is much simpler than pulling it from the various tasks as they succeed. Especially in the case where we’re running thousands of tasks in a flow.
Concurrent Logging with Dask#
Prefect plays nicely with Dask in order to provide parallel computing when possible. The Prefect scheduler is smart enough to know which tasks do and do not depend on each other, so it can intelligently decide when to run independent tasks at the same time.
Let’s run the same flow as above, except this time we’ll log eight different experiments with eight different feature sets and accuracy results. First, we’ll need to use Dask to start a local cluster.
[13]:
import dask.distributed
from prefect.executors import DaskExecutor
dask_client = dask.distributed.Client()
dask_executor = DaskExecutor(address=dask_client.cluster.scheduler.address)
Before we look at the new flow, let’s see how easy it is to make our own rubicon_ml
Prefect tasks. Currently, the log_feature_task
logs one feature to one experiment. Let’s say in this example, we want to log our entire feature set in one task. That’s slightly different than what we currently have in the log_feature_task
, so let’s see how we can make a new one using the rubicon_ml
logging library.
[14]:
@task
def log_feature_set(experiment, feature_names):
"""log a set of features to a rubicon experiment
Parameters
----------
experiment : rubicon.Experiment
the experiment to log the feature set to
feature_names : list of str
the names of the features to log to `experiment`
"""
features = []
for name in feature_names:
features.append(experiment.log_feature(name=name))
return features
Easy! Even though its so simple, this particular task is actually more in-depth than the ones you’ll find in the library. The Prefect tasks in the library are simply wrappers around existing logging library functions, like experiment.log_feature
.
Now we can make our new flow to log multiple experiments in parallel.
[15]:
n_components = [2, 2, 2, 2, 4, 4, 4, 4 ]
n_neighbors = [5, 5, 10, 10, 5, 5, 10, 10 ]
is_standardized = [True, False, True, False, True, False, True, False]
experiment_names = [f"mapped run {i}" for i in range(len(n_components))]
with Flow(
"Wine Classification with Rubicon - Mapped",
executor=dask_executor,
) as mapped_flow:
project = get_or_create_project_task(
"filesystem",
root_path,
"Wine Classification with Prefect - Mapped",
)
experiments = create_experiment_task.map(
unmapped(project),
name=experiment_names,
description=unmapped("concurrent example with Prefect"),
)
wine_dataset = load_data()
feature_names = get_feature_names(wine_dataset)
train_test_split = split_data(wine_dataset)
log_feature_set.map(experiments, unmapped(feature_names))
log_parameter_task.map(experiments, unmapped("n_components"), n_components)
log_parameter_task.map(experiments, unmapped("n_neighbors"), n_neighbors)
log_parameter_task.map(experiments, unmapped("is_standardized"), is_standardized)
accuracies = fit_pred_model.map(
unmapped(train_test_split),
n_components,
n_neighbors,
is_standardized,
)
log_metric_task.map(experiments, unmapped("accuracy"), accuracies)
Let’s register and run one last flow. If you check out the timeline for your completed flows on the UI linked by mapped_flow.register
, you’ll notice tasks executing at the same time now.
[16]:
flow_with_concurrent_rubicon_id = mapped_flow.register(
project_name="Wine Classification",
)
flow_run_with_concurrent_rubicon_id = run_flow(
prefect_client,
flow_with_concurrent_rubicon_id,
)
Flow URL: http://localhost:8080/default/flow/1ff30ad8-530c-4502-a9ad-cfed39b145fd
└── ID: 5f925f68-877c-4922-952c-927da5cf5a34
└── Project: Wine Classification
└── Labels: ['8c859084633f']
Flow run scheduled. Waiting...
Flow run scheduled. Waiting...
Running flow. Waiting...
Running flow. Waiting...
Running flow. Waiting...
All reference tasks succeeded. Waiting...
Flow run 418550e0-01eb-4d48-b78b-fbe101da0622 was successful!
Retrieving all the results of a mapped task is even more cumbersome than retrieving just the accuracy above. We’ll simply use the rubicon_ml
Dashboard to check out all the data we just logged!
[17]:
from rubicon_ml.viz import Dashboard
project = get_project(
"filesystem",
root_path,
"Wine Classification with Prefect - Mapped",
)
Dashboard(project.experiments()).show()
* Running on http://127.0.0.1:8050/ (Press CTRL+C to quit)
127.0.0.1 - - [16/Apr/2021 15:29:39] "GET /_alive_71180c28-37bc-4127-a50b-e754fe3e6812 HTTP/1.1" 200 -
Dash app running on http://127.0.0.1:8050/