View this notebook on GitHub or run it yourself on Binder!


Integrate with Prefect#

rubicon_ml offers an integration with Prefect, an open source workflow management engine used heavily within the Python ecosystem. In Prefect, a unit of work is called a task, and a collection of tasks makes up a flow. A flow represents your workflow pipeline. You can integrate rubicon_ml into your workflows to persist metadata about your experimentation.

We’ll run a Prefect server locally for this example. If you already have Prefect and Docker installed, you can start a Prefect server and agent with these commands:

prefect backend server
prefect server start

# and in a new terminal
prefect agent start

For more context, check out the workflow README on GitHub.

Setting up a simple flow#

Now we can get started! Creating Prefect tasks is easy enough on its own, but we’ve added some simple ones to the rubicon_ml library.

[1]:
from rubicon_ml.workflow.prefect import (
    get_or_create_project_task,
    create_experiment_task,
    log_artifact_task,
    log_dataframe_task,
    log_feature_task,
    log_metric_task,
    log_parameter_task,
)

We’ll need a workflow to integrate rubicon_ml logging into, so let’s put together a simple one. To mimic a realistic example, we’ll create tasks for loading data, splitting said data, extracting features, and training a model.

[2]:
from prefect import task


@task
def load_data():
    from sklearn.datasets import load_wine

    return load_wine()
[3]:
@task
def split_data(dataset):
    from sklearn.model_selection import train_test_split


    return train_test_split(
        dataset.data,
        dataset.target,
        test_size=0.25,
    )
[4]:
@task
def get_feature_names(dataset):
    return dataset.feature_names
[5]:
@task
def fit_pred_model(
    train_test_split_result,
    n_components,
    n_neighbors,
    is_standardized
):
    from sklearn import metrics
    from sklearn.decomposition import PCA
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.pipeline import make_pipeline
    from sklearn.preprocessing import StandardScaler


    X_train, X_test, y_train, y_test = train_test_split_result

    if is_standardized:
        classifier = make_pipeline(
            StandardScaler(),
            PCA(n_components=n_components),
            KNeighborsClassifier(n_neighbors=n_neighbors),
        )
    else:
        classifier = make_pipeline(
            PCA(n_components=n_components),
            KNeighborsClassifier(n_neighbors=n_neighbors),
        )

    classifier.fit(X_train, y_train)
    pred_test = classifier.predict(X_test)
    accuracy = metrics.accuracy_score(y_test, pred_test)

    return accuracy

Without rubicon_ml, here’s what this simple flow would look like:

[6]:
from prefect import Flow


n_components = 2
n_neighbors = 5
is_standardized = True

with Flow("Wine Classification") as flow:
    wine_dataset = load_data()

    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)

    accuracy = fit_pred_model(
        train_test_split,
        n_components,
        n_neighbors,
        is_standardized,
    )

Running a flow and viewing results#

Now we’ll register our flow with the local server. First, we’ll have to use the prefect CLI one more time to create a project.

prefect create project 'Wine Classification'

The URL printed from the call to flow.register opens the local Prefect UI. We can use it to run and monitor our flows.

[7]:
flow_id = flow.register(project_name="Wine Classification")
Flow URL: http://localhost:8080/default/flow/aef9fb65-c49b-4c46-89be-e31c357a165e
 └── ID: 531ffe8c-fb2b-4855-b5d8-f9bcef51ad2d
 └── Project: Wine Classification
 └── Labels: ['8c859084633f']

We’ll also put together a function to run our flows and wait for them to finish.

[8]:
import time
from prefect import Client


prefect_client = Client()

def run_flow(client, flow_id):
    flow_run_id = client.create_flow_run(flow_id=flow_id)

    is_finished = False
    while not is_finished:
        state = client.get_flow_run_info(flow_run_id).state
        print(f"{state.message.strip('.')}. Waiting...")

        time.sleep(3)

        is_finished = state.is_finished()

    assert state.is_successful()
    print(f"Flow run {flow_run_id} was successful!")

    return flow_run_id

flow_run_id = run_flow(prefect_client, flow_id)
Flow run scheduled. Waiting...
Flow run scheduled. Waiting...
All reference tasks succeeded. Waiting...
Flow run 024457a5-aac4-449a-a5a3-7b3986f90cba was successful!

We assigned a few variables in our flow, most notably the result of fit_pred_model, accuracy. This accuracy metric is how we’ll determine whether or not the model we trained is a success. However, retrieving state variables from flows is a bit cumbersome.

[9]:
info = prefect_client.get_flow_run_info(flow_run_id)

slugs = [t.task_slug for t in info.task_runs]
index = slugs.index(accuracy.slug)

result = info.task_runs[index].state._result.read(
    info.task_runs[index].state._result.location,
)
result.value
[9]:
0.9555555555555556

What’s going on here isn’t exactly intuitive, and all that only extracted one piece of data from one task.

Adding Rubicon to your flow#

We can leverage the Prefect tasks within the rubicon_ml library to log all the info we want about our model run. Then, we can use the standard rubicon_ml logging library to retrieve and inspect our metrics and other logged data. This is much simpler than digging through the state of each executed task and extracting its results.

Here’s the same flow from above, this time with rubicon_ml tasks integrated.

[10]:
import os

from prefect import unmapped


root_dir = os.environ.get("RUBICON_ROOT", "rubicon-root")
root_path = f"{os.path.dirname(os.getcwd())}/{root_dir}"

n_components = 2
n_neighbors = 5
is_standardized = True

with Flow("Wine Classification with Rubicon") as flow:
    project = get_or_create_project_task(
        "filesystem",
        root_path,
        "Wine Classification with Prefect",
    )
    experiment = create_experiment_task(
        project,
        name="logged from a Prefect task",
    )

    wine_dataset = load_data()
    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)

    log_feature_task.map(unmapped(experiment), feature_names)
    log_parameter_task(experiment, "n_components", n_components)
    log_parameter_task(experiment, "n_neighbors", n_neighbors)
    log_parameter_task(experiment, "is_standardized", is_standardized)

    accuracy = fit_pred_model(
        train_test_split,
        n_components,
        n_neighbors,
        is_standardized,
    )

    log_metric_task(experiment, "accuracy", accuracy)

Again, we’ll register and run the flow.

[11]:
flow_with_rubicon_id = flow.register(project_name="Wine Classification")
flow_run_with_rubicon_id = run_flow(prefect_client, flow_with_rubicon_id)
Flow URL: http://localhost:8080/default/flow/9b91bafc-7e8b-4361-b1d0-bc57dac4cebc
 └── ID: 081b9522-5db8-4ce6-ae64-ea8afd5b323e
 └── Project: Wine Classification
 └── Labels: ['8c859084633f']
Flow run scheduled. Waiting...
Submitted for execution. Waiting...
Running flow. Waiting...
All reference tasks succeeded. Waiting...
Flow run 29cef175-c115-4da2-a9f7-40f79bdb6104 was successful!

This time we can use rubicon_ml to inspect our accuracy, among other things!

[12]:
from rubicon_ml import Rubicon


rubicon = Rubicon(persistence="filesystem", root_dir=root_path)
project = rubicon.get_project("Wine Classification with Prefect")

experiment = project.experiments()[0]

features = [f.name for f in experiment.features()]
parameters = [(p.name, p.value) for p in experiment.parameters()]
metrics = [(m.name, m.value) for m in experiment.metrics()]

print(
    f"experiment {experiment.id}\n"
    f"features: {features}\nparameters: {parameters}\n"
    f"metrics: {metrics}"
)
experiment 6b1c3846-5aab-4bda-8c16-15b279d8616c
features: ['alcalinity_of_ash', 'alcohol', 'ash', 'color_intensity', 'flavanoids', 'hue', 'magnesium', 'malic_acid', 'nonflavanoid_phenols', 'od280/od315_of_diluted_wines', 'proanthocyanins', 'proline', 'total_phenols']
parameters: [('is_standardized', True), ('n_components', 2), ('n_neighbors', 5)]
metrics: [('accuracy', 1.0)]

Extracting data from rubicon_ml is much simpler than pulling it from the various tasks as they succeed. Especially in the case where we’re running thousands of tasks in a flow.

Concurrent Logging with Dask#

Prefect plays nicely with Dask in order to provide parallel computing when possible. The Prefect scheduler is smart enough to know which tasks do and do not depend on each other, so it can intelligently decide when to run independent tasks at the same time.

Let’s run the same flow as above, except this time we’ll log eight different experiments with eight different feature sets and accuracy results. First, we’ll need to use Dask to start a local cluster.

[13]:
import dask.distributed
from prefect.executors import DaskExecutor


dask_client = dask.distributed.Client()
dask_executor = DaskExecutor(address=dask_client.cluster.scheduler.address)

Before we look at the new flow, let’s see how easy it is to make our own rubicon_ml Prefect tasks. Currently, the log_feature_task logs one feature to one experiment. Let’s say in this example, we want to log our entire feature set in one task. That’s slightly different than what we currently have in the log_feature_task, so let’s see how we can make a new one using the rubicon_ml logging library.

[14]:
@task
def log_feature_set(experiment, feature_names):
    """log a set of features to a rubicon experiment

    Parameters
    ----------
    experiment : rubicon.Experiment
        the experiment to log the feature set to
    feature_names : list of str
        the names of the features to log to `experiment`
    """
    features = []

    for name in feature_names:
        features.append(experiment.log_feature(name=name))

    return features

Easy! Even though its so simple, this particular task is actually more in-depth than the ones you’ll find in the library. The Prefect tasks in the library are simply wrappers around existing logging library functions, like experiment.log_feature.

Now we can make our new flow to log multiple experiments in parallel.

[15]:
n_components =    [2,    2,     2,    2,     4,    4,     4,    4    ]
n_neighbors =     [5,    5,     10,   10,    5,    5,     10,   10   ]
is_standardized = [True, False, True, False, True, False, True, False]

experiment_names = [f"mapped run {i}" for i in range(len(n_components))]

with Flow(
    "Wine Classification with Rubicon - Mapped",
    executor=dask_executor,
) as mapped_flow:
    project = get_or_create_project_task(
        "filesystem",
        root_path,
        "Wine Classification with Prefect - Mapped",
    )
    experiments = create_experiment_task.map(
        unmapped(project),
        name=experiment_names,
        description=unmapped("concurrent example with Prefect"),
    )

    wine_dataset = load_data()

    feature_names = get_feature_names(wine_dataset)
    train_test_split = split_data(wine_dataset)

    log_feature_set.map(experiments, unmapped(feature_names))
    log_parameter_task.map(experiments, unmapped("n_components"), n_components)
    log_parameter_task.map(experiments, unmapped("n_neighbors"), n_neighbors)
    log_parameter_task.map(experiments, unmapped("is_standardized"), is_standardized)

    accuracies = fit_pred_model.map(
        unmapped(train_test_split),
        n_components,
        n_neighbors,
        is_standardized,
    )

    log_metric_task.map(experiments, unmapped("accuracy"), accuracies)

Let’s register and run one last flow. If you check out the timeline for your completed flows on the UI linked by mapped_flow.register, you’ll notice tasks executing at the same time now.

[16]:
flow_with_concurrent_rubicon_id = mapped_flow.register(
    project_name="Wine Classification",
)
flow_run_with_concurrent_rubicon_id = run_flow(
    prefect_client,
    flow_with_concurrent_rubicon_id,
)
Flow URL: http://localhost:8080/default/flow/1ff30ad8-530c-4502-a9ad-cfed39b145fd
 └── ID: 5f925f68-877c-4922-952c-927da5cf5a34
 └── Project: Wine Classification
 └── Labels: ['8c859084633f']
Flow run scheduled. Waiting...
Flow run scheduled. Waiting...
Running flow. Waiting...
Running flow. Waiting...
Running flow. Waiting...
All reference tasks succeeded. Waiting...
Flow run 418550e0-01eb-4d48-b78b-fbe101da0622 was successful!

Retrieving all the results of a mapped task is even more cumbersome than retrieving just the accuracy above. We’ll simply use the rubicon_ml Dashboard to check out all the data we just logged!

[17]:
from rubicon_ml.viz import Dashboard


project = get_project(
    "filesystem",
    root_path,
    "Wine Classification with Prefect - Mapped",
)

Dashboard(project.experiments()).show()
 * Running on http://127.0.0.1:8050/ (Press CTRL+C to quit)
127.0.0.1 - - [16/Apr/2021 15:29:39] "GET /_alive_71180c28-37bc-4127-a50b-e754fe3e6812 HTTP/1.1" 200 -
Dash app running on http://127.0.0.1:8050/