Searching
Almost every interaction with the Carbon Black Cloud SDK will involve searching for some object on the server that your code can then inspect or operate on. Searching in the SDK involves three steps:
Create a query object with the
select()
method.Refine the query by using the query object’s methods to add a text query and/or search criteria.
Execute the query to see its results.
Creating a Query Object
A query object is created via the CBCloudAPI.select()
operation, specifying
the type of data to be retrieved.
In this example, we create a query to search for all devices with antivirus active:
# assume the CBCloudAPI object is in the variable "api"
>>> from cbc_sdk.platform import Device
>>> device_query = api.select(Device).where('status:ACTIVE')
# The device query has been created but not yet executed
>>> type(device_query)
<class 'cbc_sdk.platform.devices.DeviceSearchQuery'>
The select()
method may take either a class reference or a string class name:
>>> query1 = api.select(Device)
>>> query2 = api.select("Device")
# prove that the query we get back in either case is the same
>>> type(query1) == type(query2)
True
Selecting an Object Directly
The select()
method may also be used to retrieve an object directly if you know its ID value, by passing the ID as
a second parameter:
>>> dev = api.select(Device, 1234567) # assume this device exists
>>> type(dev)
<class 'cbc_sdk.platform.devices.Device'>
Refining a Query
Queries may support one of two different methods for refining a query:
Through the use of text query.
Through adding criteria.
Text Query Support
Text queries may be added to a query object by using the query object’s
where()
, and_()
, and
or_()
methods. The following example sets up a query looking for events
in which the program googleupdate.exe
accesses the system registry on a device with a specific hostname, IP
address, and owner:
# assume the CBCloudAPI object is in the variable "api"
>>> from cbc_sdk.platform import Observation
>>> obs_query = api.select(Observation).where(process_name='svchost.exe').and_(observation_type='CONTEXTUAL_ACTIVITY')
# further refine the query
>>> obs_query.and_(event_type='netconn')
>>> obs_query.and_(netconn_protocol='PROTO_TCP').and_(netconn_port=80)
The where()
method supplies the initial query parameters, while and_()
and or_()
add additional query
parameters. As with other languages, and_()
gets grouped together before or_()
.
Parameters may either be supplied as text strings or as keyword assignments:
>>> from cbc_sdk.platform import Device
# the following two queries are equivalent
>>> string_query = api.select(Device).where("status:ACTIVE")
>>> keyword_query = api.select(Device).where(status="ACTIVE")
However, mixing the two types in a single query is not allowed:
# this is not allowed
>>> from cbc_sdk.platform import Device
>>> bogus_query = api.select(Device).where(status="ACTIVE").and_("virtualMachine:true")
cbc_sdk.errors.ApiError: Cannot modify a structured query with a raw parameter
Criteria Support
Criteria are usually added to queries using methods specific to each query. For example, this query looks for alerts with severity 9 or 10 on a machine running macOS 10.14.6:
>>> from cbc_sdk.platform import Alert
>>> alert_query = api.select(Alert)
# Refine the query with parameters
>>> alert_query.where(alert_severity=9).or_(alert_severity=10)
# Refine the query with criteria
>>> alert_query.set_device_os(["MAC"]).set_device_os_versions(["10.14.6"])
This query produces the following JSON block to be passed to a POST
request to the server:
{
"query": "alert_severity:9 OR alert_severity:10",
"criteria": {
"device_os": ["MAC"],
"device_os_version": ["10.14.6"]
}
}
In newer queries, the various specific methods for setting each individual criterion will be replaced with a single method:
# Refine the query with criteria (new style)
>>> alert_query.add_criteria("device_os", ["MAC"]).add_criteria("device_os_version", ["10.14.6"])
Note
The add_criteria()
method is explicitly supported with Alerts v7, as well as other query classes that make use
of CriteriaBuilderSupportMixin
. Over time, the existing “specific” methods for setting criteria will be
deprecated.
Certain queries accept a time range criterion, set with the set_time_range()
method. This allows a range of
times to be specified which returned objects must fall within. Parameters for set_time_range()
are as follows:
start
: Specifies the starting time of the range, in ISO 8601 format.end
: Specifies the ending time of the range, in ISO 8601 format.range
: Specifies the scope of the request in units of time.
A range
parameter begins with a minus sign, marking an interval backwards from the current time. This is followed
by an integer number of units, followed by a letter specifying whether the interval is years (‘y’), weeks (‘w’),
days (‘d’), hours (‘h’), minutes (‘m’), or seconds (‘s’).
Note
For Process
search, the range
parameter is called window
.
When setting a time range, either start
and end
must both be specified, or range
must be specified.
range
takes precedence if it is specified alongside start
and/or end
.
Executing a Query
To execute a query after it’s been refined, simply evaluate the query in an iterable context. This may be done
either by passing it to a function that takes iterable values, or by iterating over it in a for
loop. This
example shows how a device query may be executed:
# create and refine a device query
>>> from cbc_sdk.platform import Device
>>> device_query = api.select(Device).where('status:ACTIVE').set_os(["WINDOWS"])
# easiest way to execute it is to turn it into a list
>>> matching_devices = list(device_query)
# or you can iterate over it using a for loop
>>> for matching_device in device_query:
... print(f"Matching device ID: {matching_device.id})
...
Matching device ID: 1234
Matching device ID: 5678
# using it in a list comprehension also works
>>> matching_device_ids = [device.id for device in device_query]
>>> print(matching_device_ids)
[1234, 5678]
# you can also use the standard Python len() function to return the number of results
>>> print(len(device_query))
2
The first()
or one()
methods on a query always return the first object matched by that query. The difference
between those is that, if there is more than one result for that query, the one()
method will raise an error.
Asynchronous Queries
Some queries may also be executed asynchronously by using the execute_async()
method, which is useful if you have
a query which wil take a long time to execute and you want your script to do other things while waiting for the query
to return. Here’s how we execute the device query from the last example asynchronously:
# create and refine a device query
>>> from cbc_sdk.platform import Device
>>> device_query = api.select(Device).where('status:ACTIVE').set_os(["WINDOWS"])
# now execute it
future = device_query.execute_async()
# await the results
device_list = future.result()
The execute_async()
method returns a standard concurrent.futures.Future
object, and that Future
’s
result()
method will return a list with the results of the query.
Faceting
Facet search queries return statistical information indicating the relative weighting of the requested values as per the specified criteria. Only certain query types support faceting.
Simple Faceting
Simple faceting is built into certain queries, allowing you to generate a summary on certain fields of all objects that
match the query. To perform this, create and refine a query object as you would normally, then call the facets()
method on the query, passing it the names of the fields you want to facet on.
Here is an example for USB devices:
>>> from cbc_sdk.endpoint_standard import USBDevice
>>> usb_devices = api.select(USBDevice).set_statuses(['APPROVED'])
>>> facet_data = usb_devices.facets(['vendor_name', 'product_name'])
This facet query might produce data that looks like this:
[
{
"field": "vendor_name",
"values": [
{
"id": "Generic",
"name": "Generic",
"total": 2
},
{
"id": "Kingston",
"name": "Kingston",
"total": 2
}
]
},
{
"field": "product_name",
"values": [
{
"id": "DataTraveler 3.0",
"name": "DataTraveler 3.0",
"total": 2
},
{
"id": "Mass Storage",
"name": "Mass Storage",
"total": 2
}
]
}
]
Facet Queries
More complex facet queries are performed by creating a query on a facet type, then refining it as usual, then getting the results from the query:
>>> from cbc_sdk.platform import ObservationFacet
>>> query = api.select(ObservationFacet).where(process_pid=1000)
Facet queries have two types of special criteria that may be set. One is the range
type which is used to specify
discrete values (integers or timestamps - specified both as seconds since epoch and also as ISO 8601 strings).
The results are then grouped by occurrence within the specified range:
>>> from cbc_sdk.platform import ObservationFacet
>>> range = {
... "bucket_size": "+1DAY",
... "start": "2020-10-16T00:00:00Z",
... "end": "2020-11-16T00:00:00Z",
... "field": "device_timestamp"
... }
>>> query = api.select(ObservationFacet).where(process_pid=1000).add_range(range)
The range settings are as follows:
field
- the field to return the range for, should be a discrete one (integer or ISO 8601 timestamp)start
- the value to begin grouping atend
- the value to end grouping atbucket_size
- how large of a bucket to group results in. If grouping an ISO 8601 property, use a string like'-3DAYS'
.
Multiple ranges can be configured per query by passing a list of range dictionaries.
The other special criterion that may be set is the term
type, which allows for one or more fields to use as a
criteria on which to return weighted results. Terms may be added using the add_facet_field()
method, specifying
the name of the field to be summarized:
>>> from cbc_sdk.platform import ObservationFacet
>>> query = api.select(ObservationFacet).where(process_pid=1000).add_facet_field("process_name")
Once the facet query has been fully refined, it is executed by examining its results
property:
>>> from cbc_sdk.platform import EventFacet
>>> event_facet_query = api.select(EventFacet).add_facet_field("event_type")
>>> event_facet_query.where(process_guid="WNEXFKQ7-00050603-0000066c-00000000-1d6c9acb43e29bb")
>>> range = {
... "bucket_size": "+1DAY",
... "start": "2020-10-16T00:00:00Z",
... "end": "2020-11-16T00:00:00Z",
... "field": "device_timestamp"
... }
>>> event_facet_query.add_range(range)
>>> synchronous_results = event_facet_query.results
>>> print(synchronous_results)
EventFacet object, bound to https://defense-eap01.conferdeploy.net.
-------------------------------------------------------------------------------
num_found: 16
processed_segments: 1
ranges: [{'start': '2020-10-16T00:00:00Z', 'end': '2020...
terms: [{'values': [{'total': 14, 'id': 'modload', 'na...
total_segments: 1
Facet queries may also be executed asynchronously, as with other asynchronous queries, by calling their
execute_async()
method and then calling the result()
method on the returned Future
object:
>>> from cbc_sdk.platform import EventFacet
>>> event_facet_query = api.select(EventFacet).add_facet_field("event_type")
>>> event_facet_query.where(process_guid="WNEXFKQ7-00050603-0000066c-00000000-1d6c9acb43e29bb")
>>> range = {
... "bucket_size": "+1DAY",
... "start": "2020-10-16T00:00:00Z",
... "end": "2020-11-16T00:00:00Z",
... "field": "device_timestamp"
... }
>>> event_facet_query.add_range(range)
>>> asynchronous_future = event_facet_query.execute_async()
>>> asynchronous_result = asynchronous_future.result()
>>> print(asynchronous_result)
EventFacet object, bound to https://defense-eap01.conferdeploy.net.
-------------------------------------------------------------------------------
num_found: 16
processed_segments: 1
ranges: [{'start': '2020-10-16T00:00:00Z', 'end': '2020...
terms: [{'values': [{'total': 14, 'id': 'modload', 'na...
total_segments: 1
The result for facet queries is a single object with two properties, terms
and ranges
, that contain the facet
search result weighted as per the criteria provided:
>>> print(synchronous_result.terms)
[{'values': [{'total': 14, 'id': 'modload', 'name': 'modload'}, {'total': 2, 'id': 'crossproc', 'name': 'crossproc'}], 'field': 'event_type'}]
>>> print(synchronous_result.ranges)
[{'start': '2020-10-16T00:00:00Z', 'end': '2020-11-16T00:00:00Z', 'bucket_size': '+1DAY', 'field': 'device_timestamp', 'values': None}]
Query Timeouts
Some search queries make use of a timeout value, specified in milliseconds, which may be specified wither through
a timeout
parameter to a method, or via a timeout()
setter method on a query class. These timeouts follow a
specific set of rules.
The absolute maximum timeout value is 300,000 milliseconds (5 minutes). No search may have a timeout longer than this.
An application may specify a shorter maximum timeout value for all searches by including it in the credentials,
under the key name default_timeout
. This default timeout value may not be greater than the absolute maximum
timeout. If this value is specified, no search may have a timeout longer than this value.
This means that, for any given search, the timeout will be the smallest of these values:
The value specified via a parameter to the search, if one was specified.
The value configured in the credentials, if one is so configured.
The absolute maximum timeout value, as defined above.
Search Suggestions
Some classes offer the ability to provide “suggestions” as to search terms that may be employed, via a static method on
the class. Here is an example for Observation
:
>>> from cbc_sdk.platform import Observation
>>> suggestions = Observation.search_suggestions(api, query="device_id", count=2)
>>> for suggestion in suggestions:
... print(suggestion["term"], suggestion["required_skus_all"], suggestion["required_skus_some"])
device_id [] ['threathunter', 'defense']
netconn_remote_device_id ['xdr'] []
And here is an example for BaseAlert
:
>>> from cbc_sdk.platform import BaseAlert
>>> suggestions = BaseAlert.search_suggestions(api, query="device_id")
>>> for suggestion in suggestions:
... print(suggestion["term"], suggestion["required_skus_some"])
device_id ['defense', 'threathunter', 'deviceControl']
device_os ['defense', 'threathunter', 'deviceControl']
[...additional entries elided...]
workload_name ['kubernetesSecurityRuntimeProtection']