View this notebook on GitHub or run it yourself on Binder!
Integrate with Prefect¶
rubicon_ml
offers an integration with Prefect, an open source workflow management engine used heavily within the Python ecosystem. In Prefect, a unit of work is called a task, and a collection of tasks makes up a flow. A flow represents your workflow pipeline. You can integrate rubicon_ml
into your workflows to persist metadata about your experimentation.
We’ll run a Prefect server locally for this example. If you already have Prefect and Docker installed, you can start a Prefect server and agent with these commands:
prefect backend server
prefect server start
# and in a new terminal
prefect agent start
For more context, check out the workflow README on GitHub.
Setting up a simple flow¶
Now we can get started! Let’s create some simple Prefect tasks for the core rubcion_ml
loggers.
[1]:
from prefect import task
@task
def get_or_create_project_task(
persistence, root_dir, project_name, auto_git_enabled=False, storage_options={}, **kwargs
):
from rubicon_ml import Rubicon
rubicon = Rubicon(
persistence=persistence,
root_dir=root_dir,
auto_git_enabled=auto_git_enabled,
**storage_options,
)
project = rubicon.get_or_create_project(project_name, **kwargs)
return project
@task
def create_experiment_task(project, **kwargs):
return project.log_experiment(**kwargs)
@task
def log_artifact_task(parent, **kwargs):
return parent.log_artifact(**kwargs)
@task
def log_dataframe_task(parent, df, **kwargs):
return parent.log_dataframe(df, **kwargs)
@task
def log_feature_task(experiment, feature_name, **kwargs):
return experiment.log_feature(feature_name, **kwargs)
@task
def log_metric_task(experiment, metric_name, metric_value, **kwargs):
return experiment.log_metric(metric_name, metric_value, **kwargs)
@task
def log_parameter_task(experiment, parameter_name, parameter_value, **kwargs):
return experiment.log_parameter(parameter_name, parameter_value, **kwargs)
We’ll need a workflow to integrate rubicon_ml
logging into, so let’s put together a simple one. To mimic a realistic example, we’ll create tasks for loading data, splitting said data, extracting features, and training a model.
[2]:
@task
def load_data():
from sklearn.datasets import load_wine
return load_wine()
[3]:
@task
def split_data(dataset):
from sklearn.model_selection import train_test_split
return train_test_split(
dataset.data,
dataset.target,
test_size=0.25,
)
[4]:
@task
def get_feature_names(dataset):
return dataset.feature_names
[5]:
@task
def fit_pred_model(
train_test_split_result,
n_components,
n_neighbors,
is_standardized
):
from sklearn import metrics
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
X_train, X_test, y_train, y_test = train_test_split_result
if is_standardized:
classifier = make_pipeline(
StandardScaler(),
PCA(n_components=n_components),
KNeighborsClassifier(n_neighbors=n_neighbors),
)
else:
classifier = make_pipeline(
PCA(n_components=n_components),
KNeighborsClassifier(n_neighbors=n_neighbors),
)
classifier.fit(X_train, y_train)
pred_test = classifier.predict(X_test)
accuracy = metrics.accuracy_score(y_test, pred_test)
return accuracy
Without rubicon_ml
, here’s what this simple flow would look like:
[6]:
from prefect import flow
n_components = 2
n_neighbors = 5
is_standardized = True
@flow
def wine_flow():
wine_dataset = load_data()
feature_names = get_feature_names(wine_dataset)
train_test_split = split_data(wine_dataset)
return fit_pred_model(
train_test_split,
n_components,
n_neighbors,
is_standardized,
)
Running a flow and viewing results¶
Now we’ll run the flow.
[7]:
accuracy = wine_flow()
accuracy
00:56:44.598 | INFO | prefect.engine - Created flow run 'rugged-pony' for flow 'wine-flow'
00:56:44.646 | INFO | Flow run 'rugged-pony' - Created task run 'load_data-0' for task 'load_data'
00:56:44.647 | INFO | Flow run 'rugged-pony' - Executing 'load_data-0' immediately...
00:56:45.352 | INFO | Task run 'load_data-0' - Finished in state Completed()
00:56:45.372 | INFO | Flow run 'rugged-pony' - Created task run 'get_feature_names-0' for task 'get_feature_names'
00:56:45.373 | INFO | Flow run 'rugged-pony' - Executing 'get_feature_names-0' immediately...
00:56:45.423 | INFO | Task run 'get_feature_names-0' - Finished in state Completed()
00:56:45.440 | INFO | Flow run 'rugged-pony' - Created task run 'split_data-0' for task 'split_data'
00:56:45.441 | INFO | Flow run 'rugged-pony' - Executing 'split_data-0' immediately...
00:56:45.547 | INFO | Task run 'split_data-0' - Finished in state Completed()
00:56:45.566 | INFO | Flow run 'rugged-pony' - Created task run 'fit_pred_model-0' for task 'fit_pred_model'
00:56:45.567 | INFO | Flow run 'rugged-pony' - Executing 'fit_pred_model-0' immediately...
00:56:45.693 | INFO | Task run 'fit_pred_model-0' - Finished in state Completed()
00:56:45.719 | INFO | Flow run 'rugged-pony' - Finished in state Completed()
[7]:
0.9333333333333333
Adding Rubicon to your flow¶
We can leverage the Prefect tasks within the rubicon_ml
library to log all the info we want about our model run. Then, we can use the standard rubicon_ml
logging library to retrieve and inspect our metrics and other logged data. This is much simpler than digging through the state of each executed task and extracting its results.
Here’s the same flow from above, this time with rubicon_ml
tasks integrated.
[8]:
import os
from prefect import unmapped
n_components = 2
n_neighbors = 5
is_standardized = True
@flow
def rubicon_wine_flow():
project = get_or_create_project_task(
"memory",
".",
"Wine Classification with Prefect",
)
experiment = create_experiment_task(
project,
name="logged from a Prefect task",
)
wine_dataset = load_data()
feature_names = get_feature_names(wine_dataset)
train_test_split = split_data(wine_dataset)
log_feature_task.map(unmapped(experiment), feature_names)
log_parameter_task(experiment, "n_components", n_components)
log_parameter_task(experiment, "n_neighbors", n_neighbors)
log_parameter_task(experiment, "is_standardized", is_standardized)
accuracy = fit_pred_model(
train_test_split,
n_components,
n_neighbors,
is_standardized,
)
log_metric_task(experiment, "accuracy", accuracy)
return accuracy
Again, we’ll register and run the flow.
[9]:
accuracy = rubicon_wine_flow()
accuracy
00:56:45.890 | INFO | prefect.engine - Created flow run 'laughing-newt' for flow 'rubicon-wine-flow'
00:56:45.942 | INFO | Flow run 'laughing-newt' - Created task run 'get_or_create_project_task-0' for task 'get_or_create_project_task'
00:56:45.943 | INFO | Flow run 'laughing-newt' - Executing 'get_or_create_project_task-0' immediately...
00:56:45.995 | INFO | Task run 'get_or_create_project_task-0' - Finished in state Completed()
00:56:46.011 | INFO | Flow run 'laughing-newt' - Created task run 'create_experiment_task-0' for task 'create_experiment_task'
00:56:46.012 | INFO | Flow run 'laughing-newt' - Executing 'create_experiment_task-0' immediately...
00:56:46.052 | INFO | Task run 'create_experiment_task-0' - Finished in state Completed()
00:56:46.068 | INFO | Flow run 'laughing-newt' - Created task run 'load_data-0' for task 'load_data'
00:56:46.069 | INFO | Flow run 'laughing-newt' - Executing 'load_data-0' immediately...
00:56:46.112 | INFO | Task run 'load_data-0' - Finished in state Completed()
00:56:46.129 | INFO | Flow run 'laughing-newt' - Created task run 'get_feature_names-0' for task 'get_feature_names'
00:56:46.129 | INFO | Flow run 'laughing-newt' - Executing 'get_feature_names-0' immediately...
00:56:46.168 | INFO | Task run 'get_feature_names-0' - Finished in state Completed()
00:56:46.184 | INFO | Flow run 'laughing-newt' - Created task run 'split_data-0' for task 'split_data'
00:56:46.185 | INFO | Flow run 'laughing-newt' - Executing 'split_data-0' immediately...
00:56:46.232 | INFO | Task run 'split_data-0' - Finished in state Completed()
00:56:46.311 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-12' for task 'log_feature_task'
00:56:46.312 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-12' for execution.
00:56:46.322 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-7' for task 'log_feature_task'
00:56:46.323 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-7' for execution.
00:56:46.334 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-8' for task 'log_feature_task'
00:56:46.335 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-8' for execution.
00:56:46.356 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-6' for task 'log_feature_task'
00:56:46.357 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-6' for execution.
00:56:46.399 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-3' for task 'log_feature_task'
00:56:46.400 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-3' for execution.
00:56:46.417 | INFO | Task run 'log_feature_task-12' - Finished in state Completed()
00:56:46.425 | INFO | Task run 'log_feature_task-8' - Finished in state Completed()
00:56:46.444 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-10' for task 'log_feature_task'
00:56:46.445 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-10' for execution.
00:56:46.459 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-4' for task 'log_feature_task'
00:56:46.461 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-4' for execution.
00:56:46.499 | INFO | Task run 'log_feature_task-6' - Finished in state Completed()
00:56:46.508 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-0' for task 'log_feature_task'
00:56:46.509 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-0' for execution.
00:56:46.524 | INFO | Task run 'log_feature_task-7' - Finished in state Completed()
00:56:46.543 | INFO | Task run 'log_feature_task-10' - Finished in state Completed()
00:56:46.556 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-11' for task 'log_feature_task'
00:56:46.557 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-11' for execution.
00:56:46.567 | INFO | Flow run 'laughing-newt' - Created task run 'log_parameter_task-0' for task 'log_parameter_task'
00:56:46.568 | INFO | Flow run 'laughing-newt' - Executing 'log_parameter_task-0' immediately...
00:56:46.612 | INFO | Task run 'log_feature_task-11' - Finished in state Completed()
00:56:46.620 | INFO | Task run 'log_feature_task-3' - Finished in state Completed()
00:56:46.627 | INFO | Task run 'log_parameter_task-0' - Finished in state Completed()
00:56:46.640 | INFO | Flow run 'laughing-newt' - Created task run 'log_parameter_task-1' for task 'log_parameter_task'
00:56:46.641 | INFO | Flow run 'laughing-newt' - Executing 'log_parameter_task-1' immediately...
00:56:46.651 | INFO | Task run 'log_feature_task-0' - Finished in state Completed()
00:56:46.668 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-1' for task 'log_feature_task'
00:56:46.669 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-1' for execution.
00:56:46.696 | INFO | Task run 'log_parameter_task-1' - Finished in state Completed()
00:56:46.713 | INFO | Flow run 'laughing-newt' - Created task run 'log_parameter_task-2' for task 'log_parameter_task'
00:56:46.714 | INFO | Flow run 'laughing-newt' - Executing 'log_parameter_task-2' immediately...
00:56:46.724 | INFO | Task run 'log_feature_task-1' - Finished in state Completed()
00:56:46.749 | INFO | Task run 'log_parameter_task-2' - Finished in state Completed()
00:56:46.761 | INFO | Flow run 'laughing-newt' - Created task run 'fit_pred_model-0' for task 'fit_pred_model'
00:56:46.761 | INFO | Flow run 'laughing-newt' - Executing 'fit_pred_model-0' immediately...
00:56:46.771 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-2' for task 'log_feature_task'
00:56:46.772 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-2' for execution.
00:56:46.817 | INFO | Task run 'fit_pred_model-0' - Finished in state Completed()
00:56:46.839 | INFO | Flow run 'laughing-newt' - Created task run 'log_metric_task-0' for task 'log_metric_task'
00:56:46.840 | INFO | Flow run 'laughing-newt' - Executing 'log_metric_task-0' immediately...
00:56:46.854 | INFO | Task run 'log_feature_task-2' - Finished in state Completed()
00:56:46.871 | INFO | Task run 'log_feature_task-4' - Finished in state Completed()
00:56:46.885 | INFO | Task run 'log_metric_task-0' - Finished in state Completed()
00:56:46.971 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-5' for task 'log_feature_task'
00:56:46.972 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-5' for execution.
00:56:47.004 | INFO | Task run 'log_feature_task-5' - Finished in state Completed()
00:56:47.083 | INFO | Flow run 'laughing-newt' - Created task run 'log_feature_task-9' for task 'log_feature_task'
00:56:47.084 | INFO | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-9' for execution.
00:56:47.132 | INFO | Task run 'log_feature_task-9' - Finished in state Completed()
00:56:47.152 | INFO | Flow run 'laughing-newt' - Finished in state Completed()
[9]:
1.0
This time we can use rubicon_ml
to inspect our accuracy, among other things!
[10]:
from rubicon_ml import Rubicon
rubicon = Rubicon(persistence="memory", root_dir=".")
project = rubicon.get_project("Wine Classification with Prefect")
experiment = project.experiments()[0]
features = [f.name for f in experiment.features()]
parameters = [(p.name, p.value) for p in experiment.parameters()]
metrics = [(m.name, m.value) for m in experiment.metrics()]
print(
f"experiment {experiment.id}\n"
f"features: {features}\nparameters: {parameters}\n"
f"metrics: {metrics}"
)
experiment 35173154-a912-4ed0-9920-188d62e6f3d2
features: ['proline', 'proanthocyanins', 'flavanoids', 'alcalinity_of_ash', 'nonflavanoid_phenols', 'hue', 'alcohol', 'od280/od315_of_diluted_wines', 'malic_acid', 'ash', 'magnesium', 'total_phenols', 'color_intensity']
parameters: [('n_components', 2), ('n_neighbors', 5), ('is_standardized', True)]
metrics: [('accuracy', 1.0)]