View this notebook on GitHub or run it yourself on Binder!


Integrate with Prefect

rubicon_ml offers an integration with Prefect, an open source workflow management engine used heavily within the Python ecosystem. In Prefect, a unit of work is called a task, and a collection of tasks makes up a flow. A flow represents your workflow pipeline. You can integrate rubicon_ml into your workflows to persist metadata about your experimentation.

We’ll run a Prefect server locally for this example. If you already have Prefect and Docker installed, you can start a Prefect server and agent with these commands:

prefect backend server
prefect server start

# and in a new terminal
prefect agent start

For more context, check out the workflow README on GitHub.

Setting up a simple flow

Now we can get started! Creating Prefect tasks is easy enough on its own, but we’ve added some simple ones to the rubicon_ml library.

[1]:
from rubicon_ml.workflow.prefect import (
    get_or_create_project_task,
    create_experiment_task,
    log_artifact_task,
    log_dataframe_task,
    log_feature_task,
    log_metric_task,
    log_parameter_task,
)

We’ll need a workflow to integrate rubicon_ml logging into, so let’s put together a simple one. To mimic a realistic example, we’ll create tasks for loading data, splitting said data, extracting features, and training a model.

[2]:
from prefect import task


@task
def load_data():
    from sklearn.datasets import load_wine

    return load_wine()
[3]:
@task
def split_data(dataset):
    from sklearn.model_selection import train_test_split


    return train_test_split(
        dataset.data,
        dataset.target,
        test_size=0.25,
    )
[4]:
@task
def get_feature_names(dataset):
    return dataset.feature_names
[5]:
@task
def fit_pred_model(
    train_test_split_result,
    n_components,
    n_neighbors,
    is_standardized
):
    from sklearn import metrics
    from sklearn.decomposition import PCA
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.pipeline import make_pipeline
    from sklearn.preprocessing import StandardScaler


    X_train, X_test, y_train, y_test = train_test_split_result

    if is_standardized:
        classifier = make_pipeline(
            StandardScaler(),
            PCA(n_components=n_components),
            KNeighborsClassifier(n_neighbors=n_neighbors),
        )
    else:
        classifier = make_pipeline(
            PCA(n_components=n_components),
            KNeighborsClassifier(n_neighbors=n_neighbors),
        )

    classifier.fit(X_train, y_train)
    pred_test = classifier.predict(X_test)
    accuracy = metrics.accuracy_score(y_test, pred_test)

    return accuracy

Without rubicon_ml, here’s what this simple flow would look like:

[6]:
from prefect import flow


n_components = 2
n_neighbors = 5
is_standardized = True

@flow
def wine_flow():
    wine_dataset = load_data()

    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)

    return fit_pred_model(
        train_test_split,
        n_components,
        n_neighbors,
        is_standardized,
    )

Running a flow and viewing results

Now we’ll run the flow.

[7]:
accuracy = wine_flow()
accuracy
00:56:44.598 | INFO    | prefect.engine - Created flow run 'rugged-pony' for flow 'wine-flow'
00:56:44.646 | INFO    | Flow run 'rugged-pony' - Created task run 'load_data-0' for task 'load_data'
00:56:44.647 | INFO    | Flow run 'rugged-pony' - Executing 'load_data-0' immediately...
00:56:45.352 | INFO    | Task run 'load_data-0' - Finished in state Completed()
00:56:45.372 | INFO    | Flow run 'rugged-pony' - Created task run 'get_feature_names-0' for task 'get_feature_names'
00:56:45.373 | INFO    | Flow run 'rugged-pony' - Executing 'get_feature_names-0' immediately...
00:56:45.423 | INFO    | Task run 'get_feature_names-0' - Finished in state Completed()
00:56:45.440 | INFO    | Flow run 'rugged-pony' - Created task run 'split_data-0' for task 'split_data'
00:56:45.441 | INFO    | Flow run 'rugged-pony' - Executing 'split_data-0' immediately...
00:56:45.547 | INFO    | Task run 'split_data-0' - Finished in state Completed()
00:56:45.566 | INFO    | Flow run 'rugged-pony' - Created task run 'fit_pred_model-0' for task 'fit_pred_model'
00:56:45.567 | INFO    | Flow run 'rugged-pony' - Executing 'fit_pred_model-0' immediately...
00:56:45.693 | INFO    | Task run 'fit_pred_model-0' - Finished in state Completed()
00:56:45.719 | INFO    | Flow run 'rugged-pony' - Finished in state Completed()
[7]:
0.9333333333333333

Adding Rubicon to your flow

We can leverage the Prefect tasks within the rubicon_ml library to log all the info we want about our model run. Then, we can use the standard rubicon_ml logging library to retrieve and inspect our metrics and other logged data. This is much simpler than digging through the state of each executed task and extracting its results.

Here’s the same flow from above, this time with rubicon_ml tasks integrated.

[8]:
import os

from prefect import unmapped


n_components = 2
n_neighbors = 5
is_standardized = True

@flow
def rubicon_wine_flow():
    project = get_or_create_project_task(
        "memory",
        ".",
        "Wine Classification with Prefect",
    )
    experiment = create_experiment_task(
        project,
        name="logged from a Prefect task",
    )

    wine_dataset = load_data()
    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)

    log_feature_task.map(unmapped(experiment), feature_names)
    log_parameter_task(experiment, "n_components", n_components)
    log_parameter_task(experiment, "n_neighbors", n_neighbors)
    log_parameter_task(experiment, "is_standardized", is_standardized)

    accuracy = fit_pred_model(
        train_test_split,
        n_components,
        n_neighbors,
        is_standardized,
    )

    log_metric_task(experiment, "accuracy", accuracy)

    return accuracy

Again, we’ll register and run the flow.

[9]:
accuracy = rubicon_wine_flow()
accuracy
00:56:45.890 | INFO    | prefect.engine - Created flow run 'laughing-newt' for flow 'rubicon-wine-flow'
00:56:45.942 | INFO    | Flow run 'laughing-newt' - Created task run 'get_or_create_project_task-0' for task 'get_or_create_project_task'
00:56:45.943 | INFO    | Flow run 'laughing-newt' - Executing 'get_or_create_project_task-0' immediately...
00:56:45.995 | INFO    | Task run 'get_or_create_project_task-0' - Finished in state Completed()
00:56:46.011 | INFO    | Flow run 'laughing-newt' - Created task run 'create_experiment_task-0' for task 'create_experiment_task'
00:56:46.012 | INFO    | Flow run 'laughing-newt' - Executing 'create_experiment_task-0' immediately...
00:56:46.052 | INFO    | Task run 'create_experiment_task-0' - Finished in state Completed()
00:56:46.068 | INFO    | Flow run 'laughing-newt' - Created task run 'load_data-0' for task 'load_data'
00:56:46.069 | INFO    | Flow run 'laughing-newt' - Executing 'load_data-0' immediately...
00:56:46.112 | INFO    | Task run 'load_data-0' - Finished in state Completed()
00:56:46.129 | INFO    | Flow run 'laughing-newt' - Created task run 'get_feature_names-0' for task 'get_feature_names'
00:56:46.129 | INFO    | Flow run 'laughing-newt' - Executing 'get_feature_names-0' immediately...
00:56:46.168 | INFO    | Task run 'get_feature_names-0' - Finished in state Completed()
00:56:46.184 | INFO    | Flow run 'laughing-newt' - Created task run 'split_data-0' for task 'split_data'
00:56:46.185 | INFO    | Flow run 'laughing-newt' - Executing 'split_data-0' immediately...
00:56:46.232 | INFO    | Task run 'split_data-0' - Finished in state Completed()
00:56:46.311 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-12' for task 'log_feature_task'
00:56:46.312 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-12' for execution.
00:56:46.322 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-7' for task 'log_feature_task'
00:56:46.323 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-7' for execution.
00:56:46.334 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-8' for task 'log_feature_task'
00:56:46.335 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-8' for execution.
00:56:46.356 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-6' for task 'log_feature_task'
00:56:46.357 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-6' for execution.
00:56:46.399 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-3' for task 'log_feature_task'
00:56:46.400 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-3' for execution.
00:56:46.417 | INFO    | Task run 'log_feature_task-12' - Finished in state Completed()
00:56:46.425 | INFO    | Task run 'log_feature_task-8' - Finished in state Completed()
00:56:46.444 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-10' for task 'log_feature_task'
00:56:46.445 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-10' for execution.
00:56:46.459 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-4' for task 'log_feature_task'
00:56:46.461 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-4' for execution.
00:56:46.499 | INFO    | Task run 'log_feature_task-6' - Finished in state Completed()
00:56:46.508 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-0' for task 'log_feature_task'
00:56:46.509 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-0' for execution.
00:56:46.524 | INFO    | Task run 'log_feature_task-7' - Finished in state Completed()
00:56:46.543 | INFO    | Task run 'log_feature_task-10' - Finished in state Completed()
00:56:46.556 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-11' for task 'log_feature_task'
00:56:46.557 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-11' for execution.
00:56:46.567 | INFO    | Flow run 'laughing-newt' - Created task run 'log_parameter_task-0' for task 'log_parameter_task'
00:56:46.568 | INFO    | Flow run 'laughing-newt' - Executing 'log_parameter_task-0' immediately...
00:56:46.612 | INFO    | Task run 'log_feature_task-11' - Finished in state Completed()
00:56:46.620 | INFO    | Task run 'log_feature_task-3' - Finished in state Completed()
00:56:46.627 | INFO    | Task run 'log_parameter_task-0' - Finished in state Completed()
00:56:46.640 | INFO    | Flow run 'laughing-newt' - Created task run 'log_parameter_task-1' for task 'log_parameter_task'
00:56:46.641 | INFO    | Flow run 'laughing-newt' - Executing 'log_parameter_task-1' immediately...
00:56:46.651 | INFO    | Task run 'log_feature_task-0' - Finished in state Completed()
00:56:46.668 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-1' for task 'log_feature_task'
00:56:46.669 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-1' for execution.
00:56:46.696 | INFO    | Task run 'log_parameter_task-1' - Finished in state Completed()
00:56:46.713 | INFO    | Flow run 'laughing-newt' - Created task run 'log_parameter_task-2' for task 'log_parameter_task'
00:56:46.714 | INFO    | Flow run 'laughing-newt' - Executing 'log_parameter_task-2' immediately...
00:56:46.724 | INFO    | Task run 'log_feature_task-1' - Finished in state Completed()
00:56:46.749 | INFO    | Task run 'log_parameter_task-2' - Finished in state Completed()
00:56:46.761 | INFO    | Flow run 'laughing-newt' - Created task run 'fit_pred_model-0' for task 'fit_pred_model'
00:56:46.761 | INFO    | Flow run 'laughing-newt' - Executing 'fit_pred_model-0' immediately...
00:56:46.771 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-2' for task 'log_feature_task'
00:56:46.772 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-2' for execution.
00:56:46.817 | INFO    | Task run 'fit_pred_model-0' - Finished in state Completed()
00:56:46.839 | INFO    | Flow run 'laughing-newt' - Created task run 'log_metric_task-0' for task 'log_metric_task'
00:56:46.840 | INFO    | Flow run 'laughing-newt' - Executing 'log_metric_task-0' immediately...
00:56:46.854 | INFO    | Task run 'log_feature_task-2' - Finished in state Completed()
00:56:46.871 | INFO    | Task run 'log_feature_task-4' - Finished in state Completed()
00:56:46.885 | INFO    | Task run 'log_metric_task-0' - Finished in state Completed()
00:56:46.971 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-5' for task 'log_feature_task'
00:56:46.972 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-5' for execution.
00:56:47.004 | INFO    | Task run 'log_feature_task-5' - Finished in state Completed()
00:56:47.083 | INFO    | Flow run 'laughing-newt' - Created task run 'log_feature_task-9' for task 'log_feature_task'
00:56:47.084 | INFO    | Flow run 'laughing-newt' - Submitted task run 'log_feature_task-9' for execution.
00:56:47.132 | INFO    | Task run 'log_feature_task-9' - Finished in state Completed()
00:56:47.152 | INFO    | Flow run 'laughing-newt' - Finished in state Completed()
[9]:
1.0

This time we can use rubicon_ml to inspect our accuracy, among other things!

[10]:
from rubicon_ml import Rubicon


rubicon = Rubicon(persistence="memory", root_dir=".")
project = rubicon.get_project("Wine Classification with Prefect")

experiment = project.experiments()[0]

features = [f.name for f in experiment.features()]
parameters = [(p.name, p.value) for p in experiment.parameters()]
metrics = [(m.name, m.value) for m in experiment.metrics()]

print(
    f"experiment {experiment.id}\n"
    f"features: {features}\nparameters: {parameters}\n"
    f"metrics: {metrics}"
)
experiment 35173154-a912-4ed0-9920-188d62e6f3d2
features: ['proline', 'proanthocyanins', 'flavanoids', 'alcalinity_of_ash', 'nonflavanoid_phenols', 'hue', 'alcohol', 'od280/od315_of_diluted_wines', 'malic_acid', 'ash', 'magnesium', 'total_phenols', 'color_intensity']
parameters: [('n_components', 2), ('n_neighbors', 5), ('is_standardized', True)]
metrics: [('accuracy', 1.0)]