from collections.abc import Iterator, Mapping
from typing import Any, Optional, cast
import httpx
from slingshot.client import SlingshotClient
from slingshot.types import (
JSON_TYPE,
UNSET,
AssignSettingsSchema,
Page,
ProjectSchema,
QueryParams,
RecommendationDetailsSchema,
)
MAX_PAGES = 1000
def _dict_set_if_not_unset(
source: Mapping[str, Any], destination: dict[str, Any], key: str
) -> None:
"""Helper function for dicts that sets if the assigning value is not unset.
Checks for a key in a source mapping and, if its value is not UNSET,
adds the key and value to the destination dict.
Args:
source (Mapping[str, Any]): The mapping to read from.
destination (dict[str, Any]): The dict to write to.
key (str): The key to transfer.
"""
value = source.get(key, UNSET)
if value is not UNSET:
destination[key] = value
[docs]
class ProjectAPI:
"""API for managing projects in Slingshot."""
[docs]
def __init__(self, client: SlingshotClient):
"""Initialize the ProjectAPI."""
self.client = client
[docs]
def create(
self,
name: str,
workspace_id: str,
description: Optional[str] = UNSET,
app_id: Optional[str] = UNSET,
job_id: Optional[str] = UNSET,
cluster_path: Optional[str] = UNSET,
settings: Optional[AssignSettingsSchema] = UNSET,
) -> ProjectSchema:
"""Create a new Slingshot project for optimizing a Databricks job cluster.
Args:
name (str): The name of the Slingshot project.
workspace_id (str): The Databricks workspace ID where the job runs.
description (Optional[str], optional): A description for the
Slingshot project. Defaults to None.
app_id (Optional[str], optional): The application ID, which must
be unique across all active (not deleted) projects belonging
to a Slingshot subscriber. This field can be used to search
for a project with the :meth:`get_projects` and
:meth:`iterate_projects` methods. The `app_id` is immutable
once the project is created. Defaults to None.
job_id (Optional[str], optional): The Databricks job ID that will
be associated with this Slingshot project. Defaults to None.
cluster_path (Optional[str], optional): The name of the Databricks
job cluster to be optimized by this Slingshot project, prefixed
with "job_clusters/" for a job cluster that is available to any
task in the job; or the task name prefixed with "tasks/" for a
task-specific cluster not available to other tasks in the job.
For example, "job_clusters/my-cluster" or "tasks/task_1".
**This field is required if the job has multiple compute
clusters.** If the job has only one compute cluster, this
field is optional. Defaults to None.
Each Slingshot project is linked to a single compute cluster in
Databricks. If the `cluster_path` is not provided for a job
that has multiple compute clusters, the Slingshot project will
not be able to retrieve information about the job runs nor
generate recommendations for optimizing the compute cluster.
You can find the cluster name in the Databricks UI when viewing
the configuration for a job cluster as the "Cluster name" field,
or using the `Databricks API <https://docs.databricks.com/api/workspace/jobs/create#job_clusters-job_cluster_key>`__,
where it is called "job_cluster_key".
The task name is shown in the Databricks UI as the "Task name"
field after selecting the task in the job configuration. In
the `Databricks API <https://docs.databricks.com/api/workspace/jobs/create#tasks-task_key>`__,
it is called "task_key".
With the Databricks Python SDK, you can retrieve the
`cluster_path` using the `job_cluster_key` or `task_key` from
the job or task settings. For example, to get the
:class:`~databricks.sdk.service.jobs.Job` object and extract the
`job_cluster_key` or `task_key`, you can use the following code:
>>> from databricks.sdk import WorkspaceClient
>>> workspace_client = WorkspaceClient()
>>> job = workspace_client.jobs.get(job_id=1234567890)
If the job cluster is defined for the job and potentially
shared across tasks in the job (which is the case for jobs
created in the Databricks UI), you can retrieve the
`job_cluster_key` like this:
>>> cluster_name = job.settings.job_clusters[0].job_cluster_key
>>> print(f'cluster_path="job_clusters/{cluster_name}"')
Or, if the job cluster definition is tied to a specific
task rather than shared across the entire job, you can first
check whether the task is using a shared cluster, and if not,
use the `task_key` as the `cluster_path`. When jobs are created
with the Databricks API or SDK, tasks can be configured to use
a `new_cluster` that is not shared with other tasks, in which
case the `job_cluster_key` will not be set, and you should use
the `task_key` instead:
>>> if (cluster_name := job.settings.tasks[0].job_cluster_key):
>>> print(f'cluster_path="job_clusters/{cluster_name}"')
>>> else:
>>> task_name = job.settings.tasks[0].task_key
>>> print(f'cluster_path="tasks/{task_name}"')
See also:
- :class:`~databricks.sdk.service.jobs.Job`
- :class:`~databricks.sdk.service.jobs.JobSettings`
- :class:`~databricks.sdk.service.jobs.JobCluster`
- :class:`~databricks.sdk.service.jobs.Task`
settings (AssignSettingsSchema, optional): A dictionary that
sets Slingshot project options. Defaults to None.
- sla_minutes (Optional[int], optional): The acceptable time (in minutes) for the job to complete.
The SLA (Service Level Agreement) is the maximum time the
job should take to complete. Slingshot uses this value as
an expected upper bound when optimizing the job for lowest
cost. Defaults to None.
- auto_apply_recs (Optional[bool], optional): Automatically apply recommendations.
Defaults to False.
Returns:
ProjectSchema: The details of the newly created project.
"""
# The Slingshot API expects "workspaceId" to be in camelCase, the rest
# of the keys are in snake_case.
json: JSON_TYPE = {"name": name, "workspaceId": workspace_id}
if app_id is not UNSET:
json["app_id"] = app_id
# cluster_path is the name of a job cluster prefixed by
# "job_clusters/" or the name of a task prefixed by "tasks/".
if cluster_path is not UNSET:
json["cluster_path"] = cluster_path
if job_id is not UNSET:
json["job_id"] = job_id
if description is not UNSET:
json["description"] = description
if settings is not UNSET and settings is not None:
json["settings"] = {}
for key in (
"sla_minutes",
"auto_apply_recs",
):
_dict_set_if_not_unset(settings, json["settings"], key)
elif settings is None:
json["settings"] = None
response = cast(
dict[str, Any],
self.client._api_request(
method="POST",
endpoint="/v1/projects",
json=json,
),
)
return cast(
ProjectSchema,
response.get("result"),
)
[docs]
def update(
self,
project_id: str,
name: Optional[str] = UNSET,
workspace_id: Optional[str] = UNSET,
description: Optional[str] = UNSET,
job_id: Optional[str] = UNSET,
cluster_path: Optional[str] = UNSET,
settings: Optional[AssignSettingsSchema] = UNSET,
) -> ProjectSchema:
"""Update the attributes of an existing Slingshot project.
Only those attributes that are provided will be updated. Attributes
set to `None` will overwrite the project attribute with `None`.
Args:
project_id (str): The ID of the Slingshot project to update.
name (Optional[str], optional): The new name for the Slingshot
project.
workspace_id (Optional[str], optional): The new Databricks
workspace ID where the job runs.
**Note**: If you are changing the Databricks workspace
associated with the Slingshot project, you probably also want
to reset the project using the :meth:`reset` method. This will
remove all previous job run data from the project, allowing
Slingshot to re-optimize the job without the influence of
previous runs.
description (Optional[str], optional): The new description for the
Slingshot project.
job_id (Optional[str], optional): The new Databricks job ID that
will be associated with this Slingshot project.
**Note**: If you are changing the Databricks job associated with
the Slingshot project, you probably also want to reset the project
using the :meth:`reset` method. This will remove all previous
job run data from the project, allowing Slingshot to re-optimize
the job without the influence of previous runs.
cluster_path (Optional[str], optional): The name of the Databricks
job cluster to be optimized by this Slingshot project, prefixed
with "job_clusters/" for a job cluster that is available to any
task in the job; or the task name prefixed with "tasks/" for a
task-specific cluster not available to other tasks in the job.
For example, "job_clusters/my-cluster" or "tasks/task_1".
**This field is required if the job has multiple compute
clusters.** If the job has only one compute cluster, this
field is optional.
Each Slingshot project is linked to a single compute cluster in
Databricks. If the `cluster_path` is not provided for a job
that has multiple compute clusters, the Slingshot project will
not be able to retrieve information about the job runs nor
generate recommendations for optimizing the compute cluster.
You can find the cluster name in the Databricks UI when viewing
the configuration for a job cluster as the "Cluster name" field,
or using the `Databricks API <https://docs.databricks.com/api/workspace/jobs/create#job_clusters-job_cluster_key>`__,
where it is called "job_cluster_key".
The task name is shown in the Databricks UI as the "Task name"
field after selecting the task in the job configuration. In
the `Databricks API <https://docs.databricks.com/api/workspace/jobs/create#tasks-task_key>`__,
it is called "task_key".
With the Databricks Python SDK, you can retrieve the
`cluster_path` using the `job_cluster_key` or `task_key` from
the job or task settings. For example, to get the
:class:`~databricks.sdk.service.jobs.Job` object and extract the
`job_cluster_key` or `task_key`, you can use the following code:
>>> from databricks.sdk import WorkspaceClient
>>> workspace_client = WorkspaceClient()
>>> job = workspace_client.jobs.get(job_id=1234567890)
If the job cluster is defined for the job and potentially
shared across tasks in the job (which is the case for jobs
created in the Databricks UI), you can retrieve the
`job_cluster_key` like this:
>>> cluster_name = job.settings.job_clusters[0].job_cluster_key
>>> print(f'cluster_path="job_clusters/{cluster_name}"')
Or, if the job cluster definition is tied to a specific
task rather than shared across the entire job, you can first
check whether the task is using a shared cluster, and if not,
use the `task_key` as the `cluster_path`. When jobs are created
with the Databricks API or SDK, tasks can be configured to use
a `new_cluster` that is not shared with other tasks, in which
case the `job_cluster_key` will not be set, and you should use
the `task_key` instead:
>>> if (cluster_name := job.settings.tasks[0].job_cluster_key):
>>> print(f'cluster_path="job_clusters/{cluster_name}"')
>>> else:
>>> task_name = job.settings.tasks[0].task_key
>>> print(f'cluster_path="tasks/{task_name}"')
See also:
- :class:`~databricks.sdk.service.jobs.Job`
- :class:`~databricks.sdk.service.jobs.JobSettings`
- :class:`~databricks.sdk.service.jobs.JobCluster`
- :class:`~databricks.sdk.service.jobs.Task`
settings (AssignSettingsSchema, optional): A dictionary with
updates to the options for the Slingshot project. The options are:
- sla_minutes (Optional[int], optional): The acceptable time (in minutes) for the job to complete.
The SLA (Service Level Agreement) is the maximum time the
job should take to complete. Slingshot uses this value as
an expected upper bound when optimizing the job for lowest
cost.
- auto_apply_recs (Optional[bool], optional): Automatically apply recommendations.
Returns:
ProjectSchema: The details of the updated project.
"""
json: JSON_TYPE = {}
if name is not UNSET:
json["name"] = name
# cluster_path is the name of a job cluster prefixed by
# "job_clusters/" or the name of a task prefixed by "tasks/".
if cluster_path is not UNSET:
json["cluster_path"] = cluster_path
if job_id is not UNSET:
json["job_id"] = job_id
# The Slingshot API expects "workspaceId" to be in camelCase, the
# rest of the keys are in snake_case.
if workspace_id is not UNSET:
json["workspaceId"] = workspace_id
if description is not UNSET:
json["description"] = description
if settings is not UNSET and settings is not None:
json["settings"] = {}
for key in (
"sla_minutes",
"auto_apply_recs",
):
_dict_set_if_not_unset(settings, json["settings"], key)
elif settings is None:
json["settings"] = None
response = cast(
dict[str, Any],
self.client._api_request(
method="PUT",
endpoint=f"/v1/projects/{project_id}",
json=json,
),
)
return cast(
ProjectSchema,
response.get("result"),
)
[docs]
def delete(self, project_id: str) -> None:
"""Delete a Slingshot project by its ID.
This method removes the Slingshot project but does not affect the
Databricks job that was associated with the project.
Args:
project_id (str): The ID of the Slingshot project to delete.
Returns:
None
"""
self.client._api_request(method="DELETE", endpoint=f"/v1/projects/{project_id}")
return None
[docs]
def reset(self, project_id: str) -> None:
"""Reset a Slingshot project by its ID, removing all previous job run data from the project.
Use this method to clear all previous job run data and start fresh with
the same project. It is useful when a job changes significantly and
you want to re-optimize it without the influence of previous runs,
since Slingshot uses historical run data to optimize the job.
This does not affect the Databricks job associated with the project;
run history will still be accessible from the Databricks platform.
Args:
project_id (str): The ID of the Slingshot project to reset.
Returns:
None
"""
self.client._api_request(method="POST", endpoint=f"/v1/projects/{project_id}/reset")
return None
[docs]
def get_projects(
self,
include: Optional[list[str]] = None,
creator_id: Optional[str] = None,
app_id: Optional[str] = None,
job_id: Optional[str] = None,
page: int = 1,
size: int = 50,
) -> Page[ProjectSchema]:
"""Retrieve a paginated list of projects based on filter criteria.
Args:
include (Optional[list[str]]): Attributes within :class:`ProjectSchema`
to include in the response. If not provided, all available
attributes are included. Defaults to None.
creator_id (Optional[str], optional): The ID of the project creator
to filter projects by. Defaults to None.
app_id (Optional[str], optional): The application ID to filter
projects by. This is an identifier that is unique across all
projects for a Slingshot subscriber and is set at the time a
project is created. Defaults to None.
job_id (Optional[str], optional): The Databricks job ID to filter
projects by. Defaults to None.
page (int, optional): The page number to retrieve. Defaults to 1.
size (int, optional): The number of projects to retrieve per page.
Defaults to 50.
Returns:
Page[ProjectSchema]: A list of project details for the requested
page.
"""
params: QueryParams = {
"page": cast(str, page),
"size": cast(str, size),
}
if include:
# pyright is not happy with list[str] although QueryParams allows it
params["include"] = include # pyright: ignore
if creator_id is not None:
params["creator_id"] = creator_id
if app_id is not None:
params["app_id"] = app_id
if job_id is not None:
params["job_id"] = job_id
response: Page[ProjectSchema] = cast(
Page[ProjectSchema],
self.client._api_request(method="GET", endpoint="/v1/projects", params=params),
)
return response
[docs]
def iterate_projects(
self,
include: Optional[list[str]] = None,
creator_id: Optional[str] = None,
app_id: Optional[str] = None,
job_id: Optional[str] = None,
size: int = 50,
max_pages: int = MAX_PAGES,
) -> Iterator[ProjectSchema]:
"""Fetch all projects page by page using a memory-efficient generator.
Args:
include (Optional[list[str]]): Attributes within :class:`ProjectSchema`
to include in the response. If not provided, all available
attributes are included. Defaults to None.
creator_id (Optional[str], optional): The ID of the project creator
to filter projects by. Defaults to None.
app_id (Optional[str], optional): The application ID to filter
projects by. This is an identifier that is unique across all
projects for a Slingshot subscriber and is set at the time a
project is created. Defaults to None.
job_id (Optional[str], optional): The Databricks job ID to filter
projects by. Defaults to None.
size (int, optional): The number of projects to retrieve per page.
Defaults to 50.
max_pages (int, optional): The maximum number of pages allowed to
traverse. Defaults to 1000.
Yields:
Iterator[ProjectSchema]: A project object, one at a time.
"""
page = 1
while True:
try:
response_page: Page[ProjectSchema] = self.get_projects(
include=include,
creator_id=creator_id,
app_id=app_id,
job_id=job_id,
page=page,
size=size,
)
page_number = response_page["page"]
projects: list[ProjectSchema] = response_page["items"]
yield from projects
if page_number >= response_page["pages"] or page_number >= max_pages:
break
page += 1
except httpx.HTTPStatusError:
break
[docs]
def get_project(self, project_id: str, include: Optional[list[str]] = None) -> ProjectSchema:
"""Fetch a project by its ID.
Args:
project_id (str): The ID of the project to fetch.
include (Optional[list[str]]): Attributes within :class:`ProjectSchema`
to include in the response. If not provided, all available
attributes are included. Defaults to None.
Returns:
ProjectSchema: The project details.
"""
params: QueryParams = {}
if include:
params["include"] = include
response = cast(
dict[str, Any],
self.client._api_request(
method="GET", endpoint=f"/v1/projects/{project_id}", params=params
),
)
return cast(ProjectSchema, response.get("result"))
[docs]
def create_recommendation(self, project_id: str) -> RecommendationDetailsSchema:
"""Create a new recommendation for a Slingshot project.
Recommendations are suggested changes to Databricks job cluster
configurations meant to minimize costs while keeping job run time
within required SLAs. They are generated based on the previous job runs
associated with the Slingshot project.
A recommendation can be created for a project once Slingshot has
received details about a successful job run associated with that
project. Slingshot will begin checking for job runs after a project is
linked to a Databricks job (or a cluster within that job).
The recommendation will be in a "PENDING" state immediately after
creation, meaning it is still being processed. It can be applied if
its state is "PENDING", "UPLOADING", or "SUCCESS" (but not "FAILURE").
Note:
The returned value, a dictionary with info about the
recommendation, lacks the full details of the recommendation
because the state is still "PENDING" immediately after the
recommendation is created. Use the method
:meth:`get_recommendation` to retrieve the full details, like
this:
>>> from slingshot import SlingshotClient
>>> client = SlingshotClient()
>>> project_id = "your_project_id"
>>> # Create a recommendation
>>> recommendation = client.projects.create_recommendation(project_id)
>>> # Get the recommendation details
>>> recommendation_details = client.projects.get_recommendation(
>>> project_id=project_id, recommendation_id=recommendation["id"]
>>> )
Args:
project_id (str): The ID of the project to create a recommendation
for.
Returns:
RecommendationDetailsSchema: A dictionary with details about the
recommendation that was created. The recommendation will have a
"PENDING" state, meaning it is still being processed. To get the
full details of the recommendation, use the
:meth:`get_recommendation` method with the recommendation ID
returned in the response.
"""
response = cast(
dict[str, Any],
self.client._api_request(
method="POST",
endpoint=f"/v1/projects/{project_id}/recommendations",
),
)
return cast(
RecommendationDetailsSchema,
response.get("result"),
)
[docs]
def get_recommendation(
self,
project_id: str,
recommendation_id: str,
) -> RecommendationDetailsSchema:
"""Fetch a specific recommendation for a Slingshot project.
Recommendations are suggested changes to Databricks job cluster
configurations meant to minimize costs while keeping job run time
within required SLAs. They are generated based on the previous job runs
associated with the Slingshot project.
Args:
project_id (str): The ID of the project that the recommendation
belongs to.
recommendation_id (str): The ID of the recommendation to fetch.
Returns:
RecommendationDetailsSchema: A dictionary with details of the
recommendation.
"""
response = cast(
dict[str, Any],
self.client._api_request(
method="GET",
endpoint=f"/v1/projects/{project_id}/recommendations/{recommendation_id}",
),
)
return cast(
RecommendationDetailsSchema,
response.get("result"),
)
[docs]
def apply_recommendation(
self,
project_id: str,
recommendation_id: str,
) -> RecommendationDetailsSchema:
"""Apply a recommendation to the Slingshot project.
The recommendation is applied to the Databricks job cluster associated
with the Slingshot project.
Recommendations are suggested changes to Databricks job cluster
configurations meant to minimize costs while keeping job run time
within required SLAs. They are generated based on the previous job runs
linked to the Slingshot project.
A recommendation can be applied if its state is "SUCCESS", "PENDING",
or "UPLOADING". If the recommendation is in a "FAILURE" state,
applying it will raise an error.
Args:
project_id (str): The ID of the project that the recommendation
belongs to.
recommendation_id (str): The ID of the recommendation to fetch.
Returns:
RecommendationDetailsSchema: A dictionary with details of the
recommendation that was applied.
"""
# Apply the recommendation to the project. This raises an error if
# unsuccessful.
self.client._api_request(
method="POST",
endpoint=f"/v1/projects/{project_id}/recommendations/{recommendation_id}/apply",
)
# Retrieve the recommendation after successful application
return self.get_recommendation(
project_id=project_id,
recommendation_id=recommendation_id,
)