Elements API Cookbook
Introduction
Elements API Cookbook contains recipes for integrating applications by using Elements API. Each recipe describes a common use case that you need to solve when implementing an integration using Elements API. Additionally, each recipe contains an example of a solution that is implemented as a procedure in Python.
Recipes
A sample procedure that is implemented in Python is an integral part of each recipe.
Except for the Requests library, functions depend on the modules from the Python Standard Library. The Request library is used for communicating with Elements API.
- Authentication
- Listing devices
- Polling EPP Security Events
- Count Security Events by engine
- Polling EDR detections
Authentication
Prior to making requests to the API integration, you must obtain an authentication token by sending credentials that were
generated within Elements Security Center, defining the desired scope for the requests. If you require read-only access
to the API, you need to set the scope to connect.api.read
. Otherwise, you can obtain an authentication token with the following
scopes: connect.api.read
and connect.api.write
.
The description of each endpoint in the Elements API documentation includes information about the scopes that are required for specific operations.
When you send an authentication request to an endpoint via https://api.connect.withsecure.com/as/token.oauth2
,
the request body must include the following properties:
grant_type=client_credentials
scope=<scopes separated with space>
When you send your credentials in the Authorization
HTTP header, the header’s value comprises your identifier and a secret
value that was generated during the creation of your credentials in the Elements Security Center. To create this value, you must
combine your identifier and secret value using a colon, and then encode the resulting string using the base64 function.
auth_token := base64($CLIENT_ID + ":" + $SECRET_VALUE)
Place the encoded credentials in the Authorization header with the Basic
prefix.
You must send the request body using the application/x-www-form-url
. A successful authentication relies on valid credentials
from the Authorization header and on the client that is allowed to authenticate with the requested scopes. If not, the request will fail.
Regardless of the authentication outcome, Elements API responds in JSON format:
- If authentication is successful, the received JSON object will include the
access_token
property. You can use this token for all subsequent API requests. - If authentication fails, the received JSON object will include an error explanation.
from json import *
from base64 import b64encode
import requests
API_URL = 'https://api.connect.withsecure.com'
def authenticate(client_id, client_secret):
auth_header = b64encode(bytes(client_id + ':' + client_secret, 'utf-8')).decode('utf-8')
headers = {
'Content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'Accept': 'application/json',
'Authorization': 'Basic ' + auth_header,
'User-Agent': 'my-script' # each request must contain User-Agent header
}
scopes = ['connect.api.read', 'connect.api.write'] # authenticated client can read
# and write data
data = {'grant_type': 'client_credentials', 'scope': ' '.join(scopes)}
response = requests.post(API_URL + '/as/token.oauth2', data=data, headers=headers)
if response.ok:
res_body = response.json()
return res_body['access_token']
else:
print('Response=' + response.text)
print('Headers=' + str(response.headers))
# Each response contains header `X-Transaction`. It can be send to Elements API
# support team in order to help investigation of possible errors
print('Transaction-id=' + response.headers.get('X-Transaction'))
raise Exception('Authentication failed')
Listing all devices
A paging functionality is available in all Elements API endpoints that return extensive lists of items, such as device or incident listings.
This allows you to specify the maximum number of items to be sent in the response. If the total number of items matching the
query criteria exceeds the specified limit, Elements API includes a property called nextAnchor
in the response. This nextAnchor
contains a link to the next batch of items, referred to as the “next page”. If you include this value in the request parameters,
Elements API will return the next page of items. You can repeat this process until nextAnchor
is no longer present in the response.
In this recipe, the function list_all_devices()
reads all devices of a specified organization that have a critical
protection status.
The function sets the limit to 10, which means that the response can contain up to 10 items in the list from the property response.items
.
The listing function ends its iteration when the value for the property response.nextAnchor
is not found.
import requests
from authentication import authenticate
API_URL = "https://api.connect.withsecure.com"
DEVICES_PATH = "/devices/v1/devices"
def get_devices(auth_token,
organization_id=None, next_page=None):
headers = {
"Accept": "application/json", # always use that header if you expect JSON response
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
params = {"limit": 10} # use `limit` to define size of single page
params["protectionStatusOverview"] = "critical"
# with parameter `protectionStatusOverview` we can read select devices with given
# protection status.
if organization_id:
# `organization_id` parameter is optional. If authenticated client belongs to
# partner organization, it can use `organization_id` to select devices, that
# belong to given organization
params["organizationId"] = organization_id
if next_page:
params["anchor"] = next_page # add `anchor` to query parameters to read next page
# as value use property `nextAnchor` from last response
# if `nextAnchor` is not present in response it means
# that query returned last page
response = requests.get(API_URL + DEVICES_PATH, params=params, headers=headers)
if not response.ok:
print("Error", response.text)
sys.exit(0)
body = response.json()
items = body["items"] # if any device matching query is found then `items`
# list is empty
for d in items:
print("Device name={}, status={}, company={}".format(d["name"], d["protectionStatusOverview"],
d["company"]["name"]))
if "nextAnchor" in body:
# if `nextAnchor` is present in response, client can use it to read next page
# of query result. For example by calling function
# get_devices_with_status(client_id, client_secret, next_page=response["nextAnchor"])
print("Link to next page=", body["nextAnchor"])
return body["nextAnchor"]
else:
# if `nextAnchor` is missing in response it means, that client received last page
# of query results
print("Last page")
return None
def list_all_devices(client_id, client_secret, organization_id=None):
auth_token = authenticate(client_id, client_secret)
next_page = True
anchor = None
while next_page:
anchor = get_devices(auth_token, organization_id, anchor)
next_page = anchor is not None
Polling Security Events
To maintain an up-to-date list of continuously generated resources, such as
EPP Security Events or
EDR Incidents,
you can use a technique called polling
. Using request parameters to define the time range, you can periodically send a request to Elements API to retrieve all items that were created since the
previous check. In response, Elements API returns only those items that
were created or updated within the specified range.
The function poll_security_events()
periodically retrieves all the events that were generated since the timestamp was saved in the
variable last_date
. When the function is invoked, this variable is initialized with the current time in the UTC time zone.
If the function get_events_after()
finds any EPP Security Event, the last_date
variable is updated with the timestamp from the last event
and is used in the next iteration.
The get_events_after()
function converts last_date
to a string in the following format: YYYY-MM-DDThh:mm:ss.SSS
.
This value is then included in the request as a parameter called persistenceTimestampStart
. Furthermore, the function sets the
exclusiveStart
parameter to true
, signifying that Elements API will return all events with a
persistenceTimestamp
greater than the value specified in persistenceTimestampStart
.
You must always include the exclusiveStart
parameter during polling to prevent duplicate items. If this parameter is omitted,
Elements API will return all items with a persistenceTimestamp
greater than or equal to the value specified in
persistenceTimestampStart
. As a result, you may receive the same event as in the previous iteration.
from datetime import datetime, timezone
from time import sleep
import requests
from authentication import authenticate
API_URL = 'https://api.connect.withsecure.com'
EVENTS_PATH = "/security-events/v1/security-events"
def get_events_page(auth_token, min_date, org_id=None, next_page=None):
headers = {
"Accept": "application/json", # always use that header if you expect JSON response
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script",
"Content-type": "application/x-www-form-urlencoded"
}
params = {"limit": 200,
"engineGroup": ["epp", "edr"],
"persistenceTimestampStart": min_date, # read all events created AFTER `min_date`
"order": "asc", # ascending order
}
# `min_date` represents timestamp of last event in previous page
# That event should not be included in next response. When `exclusiveStart=true` is set
# then `persistenceTimestampStart` is used as exclusive start of time range.
params["exclusiveStart"] = "true"
if next_page:
params['anchor'] = next_page
if org_id:
params["organizationId"] = org_id
response = requests.post(API_URL + EVENTS_PATH, data=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
def get_events_after(auth_token, last_date, org_id):
# Elements API accepts timestamps in [RFC 3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6)
# format: YYYY-MM-DDThh:mm:ss.SSS
last_date_str = last_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
next_page = None
fetch_page = True
print("Reading events created after {}".format(last_date_str))
while fetch_page:
page = get_events_page(auth_token, last_date_str, org_id, next_page)
next_page = page.get("nextAnchor")
# If `nextAnchor` is present we should fetch next page in next iteration
fetch_page = next_page is not None
for event in page["items"]:
print("EventId={}, EventTs={}".format(event["id"],
event["persistenceTimestamp"]))
# updating `last_date` with time when event was persisted
# when iteration ends, `last_date` will be equal to `persistenceTimestamp`
# of last received event
last_date_str = event["persistenceTimestamp"]
# Parse timestamp of last received event after reading all
# events, that were created after `min_dt`
return datetime.strptime(last_date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
def poll_security_events(client_id, client_secret, poll_interval, org_id=None):
# initialize start date with current timestamp
last_date = datetime.now(timezone.utc)
# start infinite loop
while True:
# obtain authentication token
auth_token = authenticate(client_id, client_secret)
# read all events created after `last_date`. After reading all events
# that variable is updated with timestamp of last events. `last_date` will be
# used in next iteration
last_date = get_events_after(auth_token, last_date, org_id)
print("Last date", last_date)
# execute next iteration every `poll_interval`
sleep(poll_interval)
Count Security Events by engine
Except listing Security Events you can also request Elements API to produce data summary. API selects items that are matching parameters, groups them by chosen property and return number of items that belongs to each group. In response API sends list of rows where every item represents one aggregation group.
from datetime import datetime, timezone, timedelta
import requests
from authentication import authenticate
API_URL = "https://api.connect.withsecure.com"
PATH = "/security-events/v1/security-events"
def count_by_engine_last_week(client_id, client_secret, org_id):
end = datetime.now(tz=timezone.utc)
start = end - timedelta(days=7)
stats = count_by_engine(client_id, client_secret, start, end, org_id)
fmt = "|{}|{}|"
print(fmt.format("Engine", "Events count"))
for row in stats["items"]:
print(fmt.format(row["engine"], row["count"]))
def count_by_engine(client_id, client_secret, start_dt, end_dt, org_id=None):
auth_token = authenticate(client_id, client_secret)
headers = {
# when `Accept` header has value `application/vnd.withsecure.aggr+json`
# then API calculates statistics for events matching query parameters
"Accept": "application/vnd.withsecure.aggr+json",
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script",
"Content-Type": "application/x-www-form-urlencoded",
}
param_start_dt = start_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
param_end_dt = end_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
# API selects all security events from `epp` engine group that were persisted
# between `start_dt` and `end_dt`, group items by value of `engine` property
# and return number of items in each group
params = {
"engineGroup": ["epp"],
"persistenceTimestampStart": param_start_dt,
"persistenceTimestampEnd": param_end_dt,
"count": "engine", # property `engine` is used to group
}
if org_id:
params["organizationId"] = org_id
response = requests.post(API_URL + PATH, data=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
In recipe above function count_by_engine
reads security events in given time range that is specified with
query parameters persistenceTimestampStart
and persistenceTimestampEnd
which comes from any
EPP engine. Request should include header Accept: "application/vnd.withsecure.aggr+json"
to indicate that
it expects data summary and uses parameter count: "engine"
to instruct API to group security events by property
engine
.
Function count_by_engine_last_week
gets response from API and iterate over list of items to produce table. Each row
represents different EPP engine which name is visible in first column and number of events from that engine that is printed in
second column.
Polling EDR detections
It is only possible to read a list of EDR detections, for a single EDR incident. To keep the list up-to-date, you have to iterate over the list of open incidents and for each item, read the list of related detections. A long list of incidents will lead to a high number of requests that are sent to Elements API. It may lead to request throttling when the frequency is too high.
To limit the risk of request throttling, we recommend that you query detection only for those incidents that were updated since the previous check. The incidents are mutable entities, which means that their state may change over time, for example, when the system receives a new detection or the status of an incident is updated. Elements API allows you to find incidents that were updated within a specified time range. After getting a list of recent updates, you can use it to fetch last detections.
The function poll_edr_detections()
periodically calls the get_incidents_updated_after
function to retrieve EDR incidents that were
created after last_date
. The last_date
is initialized when the poll_detections()
function is called and it gets updated whenever
the get_incidents_updated_after
function finds a new incident. Whenever a new incident is found, the function print_all_detections
is triggered
to read all the related detections using a paging technique.
from datetime import datetime, timezone
from time import sleep
import requests
from authentication import authenticate
API_URL = 'https://api.connect.withsecure.com'
INCIDENTS_PATH = "/incidents/v1/incidents"
DETECTIONS_PATH = "/incidents/v1/detections"
def get_updated_incidents(auth_token, min_date, org_id=None, next_page=None):
headers = {
"Accept": "application/json", # always use that header if you expect JSON response
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
params = {
"limit": 50,
"archived": False, # read only not archived incidents
"updatedTimestampStart": min_date, # read all incidents updated AFTER `min_date`
"order": "asc", # ascending order by `updateTimestampStart`
}
# `min_date` represents timestamp of last event in previous page
# That event should not be included in next response. When `exclusiveStart=true` is set
# then `updatedTimestampStart` is used as exclusive start of time range.
params["exclusiveStart"] = "true"
if next_page:
params['anchor'] = next_page
if org_id:
params["organizationId"] = org_id
response = requests.get(API_URL + INCIDENTS_PATH, params=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
def get_new_detections_page(auth_token, incident_id, min_date, next_page=None):
headers = {
"Accept": "application/json",
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
params = {"limit": 100,
"incidentId": incident_id
}
if next_page:
params['anchor'] = next_page
response = requests.get(API_URL + DETECTIONS_PATH, params=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
def print_all_detections(auth_token, incident_id, min_date):
next_page = None
fetch_page = True
while fetch_page:
page = get_new_detections_page(auth_token, incident_id, min_date, next_page)
next_page = page.get("nextAnchor")
fetch_page = next_page is not None
for det in page["items"]:
print("DetectionId={}, IncidentId={}, Created={}".format(det["detectionId"],
det["incidentId"], det["createdTimestamp"]))
def get_incidents_updated_after(auth_token, min_dt, org_id):
# Elements API accepts timestamps in [RFC 3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6)
# format: YYYY-MM-DDThh:mm:ss.fff
last_date = min_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
next_page = None
fetch_page = True
print("Reading incidents updated after {}".format(last_date))
updated_incidents = []
while fetch_page:
page = get_updated_incidents(auth_token, last_date, org_id, next_page)
next_page = page.get("nextAnchor")
# If `nextAnchor` is present we should fetch next page in next iteration
fetch_page = next_page is not None
for incident in page["items"]:
print("IncidentId={}, Updated Ts={}".format(incident["incidentId"],
incident["updatedTimestamp"]))
# updating `last_date` with time when incident was updated
# when iteration ends, `last_date` will be equal to `updatedTimestamp`
# of last received incident
last_date = incident["updatedTimestamp"]
updated_incidents.append((incident["incidentId"], last_date))
# Parse timestamp of last received incident after reading all
# events, that were created after `min_dt`
return {
"last_date": datetime.strptime(last_date, "%Y-%m-%dT%H:%M:%S.%fZ"),
"updated_incidents": updated_incidents
}
def poll_edr_detections(client_id, client_secret, poll_interval, org_id=None):
# initialize start date with current timestamp
last_date = datetime.now(timezone.utc)
# start infinite loop
while True:
# obtain authentication token
auth_token = authenticate(client_id, client_secret)
# read all incidents created after `last_date`. After reading all items
# that variable is updated with timestamp of last incident. `last_date` will be
# used in next iteration
updates = get_incidents_updated_after(auth_token, last_date, org_id)
last_date = updates["last_date"]
for incident in updates["updated_incidents"]:
print_all_detections(auth_token, incident[0], incident[1])
print("Last date", last_date)
# execute next iteration every `poll_interval`
sleep(poll_interval)
Next Steps
- Review the API Reference for more endpoints
- Check out the Getting Started Guide for basic concepts
- Join the Community for support