Alerts
Use alerts to get notifications about monitored activities such as the appearance or spread of risky files on your endpoints. The Carbon Black Cloud Python SDK provides an easy way to search, investigate and set the workflow of Alerts using python classes instead of raw requests.
You can use all the operations shown in the API, such as retrieving, filtering, closing, and adding notes to the
alert or the associated threat.
You can locate the full list of operations and attributes in the Alert()
class.
Resources
API Documentation on Developer Network
Alert Search Fields on Developer Network
Example script in GitHub
If you are updating from SDK version 1.4.3 or earlier, see the `alerts-migration`_ guide.
If you are updating from Notifications, see the `notification-migration`_ guide.
Note
In Alerts v7, and therefore SDK 1.5.0 onwards, Observed Alerts are not included; they are an Observation. The field category
has been removed from Alert. In other APIs where this field remains it will always have a value of THREAT
.
More information is available
here.
Retrieve Alerts
By using the following the example, you can retrieve the first 5 [:5]
alerts that have a minimum severity level of 7
.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import Alert
>>> api = CBCloudAPI(profile='sample')
>>> alerts = api.select(Alert).set_minimum_severity(7)[:5]
>>> print(alerts[0].id, alerts[0].device_os, alerts[0].device_name, alerts[0].category)
d689e626-5d6a-<truncated> WINDOWS Alert-WinTest THREAT
Filter Alerts
Filter alerts by using the fields described in the Alert Search Schema.
Set required values for specific fields by using the add_criteria()
method to limit the number of returned alerts.
Use this method for fields that are identified in the Alert Search Fields
with “Searchable Array”.
The following snippet limits returns to specific devices, where the device_id is an integer and the device_target_value is a string.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import Alert
>>> api = CBCloudAPI(profile='sample')
>>> alerts = api.select(Alert).add_criteria("device_id", [123, 456])
>>> alerts = api.select(Alert).add_criteria("device_target_value", ["MISSION_CRITICAL", "HIGH"])
Fields in the Alert Search Fields identified only with “Searchable” require the criteria to be a single value instead of a list of values. The SDK has hand-crafted methods to set the criteria for these fields.
The following code snippet shows the methods for alert_notes_present
and minimum_severity
, and the
alerts that meet each criteria.
>>> alerts = api.select(Alert).set_alert_notes_present(True)
>>> print(len(alerts))
3
>>> alerts = api.select(Alert).set_minimum_severity(9)
>>> print(len(alerts))
1072
>>> alerts = api.select(Alert).set_minimum_severity(3)
>>> print(len(alerts))
69100
>>>
You can use the where
method to define a custom query to filter alerts. The where
method supports strings and solr-like queries. Alternatively, you can use solrq
query objects
for more complex searches. The following example searches by using a solr query search string for alerts
where the device_target_value is MISSION_CRITICAL or HIGH and is the equivalent of the preceding add_criteria clause.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import Alert
>>> api = CBCloudAPI(profile='sample')
>>> alerts = api.select(Alert).where("device_target_value:MISSION_CRITICAL or device_target_value:HIGH")
>>> for alert in alerts:
... print(alert.id, alert.device_os, alert.device_name, alert.device_target_value)
8aa6272a-17cb-31c0-9352-67e45c0251f3 WINDOWS jenkin MISSION_CRITICAL
d987a112-8b7b-18c9-43d9-76ced09d9ded WINDOWS MYDEMOMACHINE\DESKTOP-04 MISSION_CRITICAL
0f915c4d-5652-b3e5-50d8-f4dcfc632396 WINDOWS jenkin MISSION_CRITICAL
1f13e581-840f-1207-f661-d9b176ee9d6c WINDOWS jenkin MISSION_CRITICAL
6ae56007-1213-4ee1-a50c-d221066ce8c9 WINDOWS MYBUILDMACHINE\Desktop-01 HIGH
... truncated ...
Tip
When filtering by fields that take a list parameter, an empty list is treated as a wildcard and matches everything.
For example, the following snippet returns all types:
>>> alerts = api.select(Alert).set_types([])
It is equivalent to:
>>> alerts = api.select(Alert)
Tip
More information about the solrq
can be found in
their documentation.
Export Alerts in CSV format
Up to 25,000 alerts can be exported in a csv file.
This is an asynchronous process in Carbon Black Cloud and to use the APIs directly, three calls are required; start the job, check status until it completes, then download the results. The SDK wraps these calls and simplifies the code needed.
Modify the following example with criteria to meet your needs.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import Alert
>>> api = CBCloudAPI(profile="YOUR_PROFILE_HERE")
>>> alert_query = api.select(Alert).add_criteria("device_os", "WINDOWS").set_minimum_severity(3)\
... .set_time_range(range="-10d")
>>> job = alert_query.export()
>>> job.await_completion().result()
>>> csv_report = job.get_output_as_string()
>>> print(csv_report)
Retrieving Alerts for Multiple Organizations
By using the following example, you can retrieve alerts for multiple organizations. Ensure you have a profile created for each org in the cbc credential file.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import Alert
>>> org_list = ["org1", "org2"]
>>> for org in org_list:
... org = "".join(org)
... api = CBCloudAPI(profile=org)
... alerts = api.select(Alert).set_minimum_severity(7)[:5]
... print("Results for Org {}".format(org))
>>> for alert in alerts:
... print(alert.id, alert.device_os, alert.device_name, alert.category)
You can also read from a csv file by using values that match the profile names in a credentials.cbc file.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import Alert
>>> import csv
>>> file = open ("data.csv", "r", encoding="utf-8-sig")
>>> org_list = list(csv.reader(file, delimiter=","))
>>> file.close()
>>> for org in org_list:
... org = "".join(org)
... api = CBCloudAPI(profile=org)
... alerts = api.select(Alert).set_minimum_severity(7)[:5]
... print("Results for Org {}".format(org))
>>> for alert in alerts:
... print(alert.id, alert.device_os, alert.device_name, alert.category)
Grouping Alerts
The examples below illustrates how to create and manipulate grouped alert objects. A Grouped Alert is a collections of alerts that share a common threat id. When grouping alerts by a threat id it allows greater context and insight surrounding the pervasiveness of a threat.
This first example retrieves all groupings of watchlist alerts from the past 10 days that have a minimum severity level of 3. If this feels familiar to basic alert retrieval, the only difference of note at this stage is that we select a GroupedAlert instead of an Alert.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import GroupedAlert
>>> api = CBCloudAPI(profile="sample")
>>> grouped_alert_search_query = api.select(GroupedAlert)
>>> grouped_alert_search_query = grouped_alert_search_query.set_time_range(range="-10d").add_criteria("type", "WATCHLIST").set_minimum_severity(3)
>>> # trigger the search to execute:
>>> grouped_alert = grouped_alert_search_query.first()
>>> print("Number of groups: {}, Total alerts in all groups {}".format(grouped_alert_search_query._total_results, grouped_alert_search_query._group_by_total_count))
Number of groups: 19, Total alerts in all groups 2454
Also like Alerts, first() can be used on the query to retrieve the first grouping of alerts and study the metadata for a given threat id.
>>> first_alert_grouping = grouped_alert_search_query.first()
>>> print(first_alert_grouping.count, first_alert_grouping.highest_severity, first_alert_grouping.device_count, first_alert_grouping.workflow_states)
534 7 3 ("OPEN": 534)
>>> most_recent_alert = first_alert_grouping.most_recent_alert_
>>> print(most_recent_alert.threat_id)
It may be necessary to retrieve all of the alerts from a threat id grouping for further inspection, it is possible to directly retrieve the associated alert search query from a given grouped alert
>>> alert_search_query = first_alert_grouping.get_alert_search_query()
>>> alerts = alert_search_query.all()
It is also possible to create grouped facets from the group alert search query
>>> grouped_alert_facets = grouped_alert_search_query.facets(["type", "THREAT_ID"], 0, True)
Suppose instead of grouped alerts, you had been working with alerts and wanted to crossover to grouped alerts. Instead of building a new group alert query from scratch you can transform an alert search query into a grouped alert search query or vice versa!
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import Alert, GroupedAlert
>>> api = CBCloudAPI(profile="sample")
>>> alert_search_query = api.select(Alert)
>>> alert_search_query = alert_search_query.set_time_range(range="-10d").add_criteria("type", "WATCHLIST").set_minimum_severity(3)
>>> group_alert_search_query = alert_search_query.set_group_by("threat_id")
>>> alert_search_query = group_alert_search_query.get_alert_search_query()
Note
When transforming from one query type to another the sort order parameter is not preserved. If it is necessary, it will have to be added to the queries criteria manually.
Retrieving Observations to Provide Context About an Alert
All alert types other than Watchlist Alerts have associated Observations that provide more information about the interesting events that contributed to the identification of an Alert.
The Alert v7 object (supported in SDK 1.5.0 onwards) has significantly more metadata when compared to the earlier Alerts v6 API (in the SDK version 1.4.3 and earlier). Therefore, the enrichment might not be required depending on your use case. New fields include process, child process, and parent process commandlines and IP addresses for network events. Find the complete list of fields in the Alert Search Fields
Observations are part of
Investigate Search Fields.
Available fields are identified by the route “Observation”.
Methods on the Observation Class, which can be found here: Observation()
For the entire Observation details including fields marked with OBSERVATION***
in the Investigate Search Fields
then use get_details()
on the Observation object.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import CBAnalyticsAlert
>>> api = CBCloudAPI(profile="sample")
>>> alert = api.select(Alert).add_criteria("type", "CB_ANALYTICS").first()
>>> observations = alert.get_observations()
>>> observations
[<cbc_sdk.platform.observations.Observation: id a5aa40856d5511ee8059132eb84e1d6d:470147c9-d79b-3f01-2083-b30bc0c0629f> @ https://defense.conferdeploy.net]
>>> print(observations[0])
Observation object, bound to https://defense.conferdeploy.net.
------------------------------------------------------------------------------
alert_id: [list:1 item]:
[0]: 470147c9-d79b-3f01-2083-b30bc0c0629f
backend_timestamp: 2023-10-18T01:28:59.900Z
blocked_effective_reputation: KNOWN_MALWARE
blocked_hash: [list:1 item]:
[0]: 659e469f8dadcb6c32ab1641817ee57c327003dffa443c3...
blocked_name: c:\windows\system32\fltlib.dll
childproc_effective_reputation: KNOWN_MALWARE
childproc_effective_reputation_source: HASH_REP
childproc_hash: [list:1 item]:
[0]: 659e469f8dadcb6c32ab1641817ee57c327003dffa443c3...
... truncated ...
Retrieving Processes to Provide Context About an Alert
You can retrieve process details on any Alert with a process_guid
. You can use list slicing
to retrieve the first n
results (in the example, this value is 10
).
The full list of attributes and methods are in the Process()
class.
For the entire process details including fields marked with PROCESS***
in the Investigate Search Fields
then use get_details()
on the Process object.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import WatchlistAlert, Process
>>> api = CBCloudAPI(profile='sample')
>>> alerts = api.select(WatchlistAlert)[:10]
>>> for alert in alerts:
... process = alert.get_process()
... print(process)
{'alert_id': ['0a3c45bf-fce6-4a63', '12030b8f-ce3f-48bd'], 'attack_tactic': 'TA0002' <truncated>..}
{'alert_id': ['02f6aecd-73d7-456d', 'e47c13dd-75a9-44de'], 'attack_tactic': 'TA0002' <truncated>..}
... truncated ...
Get Process Events
You can fetch every event that corresponds with a Process by calling process.events()
.
Note
Because calling the events can be an intensive task, in following example fetches only the first 10
events. Be cautious when calling all()
.
>>> from cbc_sdk import CBCloudAPI
>>> from cbc_sdk.platform import WatchlistAlert, Process
>>> api = CBCloudAPI(profile='sample')
>>> alert = api.select(WatchlistAlert).first()
>>> process = alert.get_process()
>>> events = process.events()[:10]
>>> print(events[0].event_description) # Note that I've stripped the `<share>` and `<link>` tags, which are also available in the response.
'The application c:\\program files (x86)\\google\\chrome\\application\\chrome.exe attempted to modify the memory of "c:\\program files (x86)\\google\\chrome\\application\\chrome.exe", by calling the function "NtWriteVirtualMemory". The operation was successful.'
...
Device Control Alerts
Device Control Alerts are explained in the Device Control guide.
Container Runtime Alerts
Container Runtime Alerts represent alerts for behavior that is noticed inside a Kubernetes container. These alerts are based on network traffic and are
triggered by anomalies from the learned behavior of workloads or applications. For these events, the type
is
CONTAINER_RUNTIME
. Additional fields such as connection_type
and egress_group_name
are also available.
To see all available fields, filter Alert Types Supported to CONTAINER_RUNTIME on the Alert Search Fields.
Alert Workflow
The Alert Closure workflow enables Alert lifecycle management.
An alert goes through the states of Open, In Progress, and Closed. Any transition can occur, including from Closed back to Open or In Progress.
The workflow leverages the alert search structure to specify the alerts to close.
Use an Alert Search to specify which Alerts will have their status updated.
The request body is a search request and all alerts matching the request will be updated.
Two common uses are to update one alert, or to update all alerts with a specific threat id.
Any search request can be used as the criteria to select alerts to update the alert status.
>>> # This query will select only the alert with the specified id
>>> ALERT_ID = "id of the alert that you want to close"
>>> alert_query = api.select(Alert).add_criteria("id", [ALERT_ID])
>>> # This query will select all alerts with the specified threat id. It is not used again in this example
>>> alert_query_for_threat = api.select(Alert).add_criteria("threat_id","CFED0B211ED09F8EC1C83D4F3FBF1709")
Submit a job to update the status of Alerts.
The status can be
OPEN
,IN PROGRESS
orCLOSED
(previouslyDISMISSED
).You may include a Closure Reason.
>>> # by calling update on the alert_query, the a request to change the status
>>> # for all alerts matching that criteria will be submitted
>>> job = alert_query.update("CLOSED", "RESOLVED", "NONE", "Setting to closed for SDK demo")
The immediate response confirms that the job was successfully submitted.
>>> print("job.id = {}".format(job.id))
job.id = 1234567
Use the
Job() cbc_sdk.platform.jobs.Job
class to determine when the update is complete.Use the Job object to wait until the Job has completed. The python script will wait while the SDK polls to determine when the job is complete.
>>> completed_job = job.await_completion().result()
Refresh the Alert Search to get the updated alert data into the SDK.
>>> alert.refresh()
>>> print("Status = {}, Expecting CLOSED".format(alert.workflow["status"]))
You can dismiss future Alerts that have the same threat id.
Use the sequence of calls to update future alerts that have the same threat id. This sequence is usually used in conjunction with with the alert closure; that is, you can use the dismiss future alerts call to close future occurrences and call an alert closure to close current open alerts that have the threat id.
>>> alert_threat_query = api.select(Alert).add_criteria("threat_id","CFED0B211ED09F8EC1C83D4F3FBF1709")
>>> alert.dismiss_threat("threat remediation done", "testing dismiss_threat in the SDK")
>>> # To undo the dismissal, call update
>>> alert.update_threat("threat remediation un-done", "testing update_threat in the SDK")
High Volume and Streaming Solution for Alerts
For near-real-time streaming of alerts, see Data Forwarder.