Elements API Cookbook

Introduction

Elements API Cookbook contains recipes for integrating applications by using Elements API. Each recipe describes a common use case that you need to solve when implementing an integration using Elements API. Additionally, each recipe contains an example of a solution that is implemented as a procedure in Python.

Recipes

A sample procedure that is implemented in Python is an integral part of each recipe.

Except for the Requests library, functions depend on the modules from the Python Standard Library. The Request library is used for communicating with Elements API.

  1. Authentication
  2. Listing devices
  3. Polling EPP Security Events
  4. Count Security Events by engine
  5. Polling EDR detections

Authentication

Prior to making requests to the API integration, you must obtain an authentication token by sending credentials that were generated within Elements Security Center, defining the desired scope for the requests. If you require read-only access to the API, you need to set the scope to connect.api.read. Otherwise, you can obtain an authentication token with the following scopes: connect.api.read and connect.api.write.

The description of each endpoint in the Elements API documentation includes information about the scopes that are required for specific operations.

When you send an authentication request to an endpoint via https://api.connect.withsecure.com/as/token.oauth2, the request body must include the following properties:

  • grant_type=client_credentials
  • scope=<scopes separated with space>

When you send your credentials in the Authorization HTTP header, the header’s value comprises your identifier and a secret value that was generated during the creation of your credentials in the Elements Security Center. To create this value, you must combine your identifier and secret value using a colon, and then encode the resulting string using the base64 function.

auth_token := base64($CLIENT_ID + ":" + $SECRET_VALUE)

Place the encoded credentials in the Authorization header with the Basic prefix.

You must send the request body using the application/x-www-form-url. A successful authentication relies on valid credentials from the Authorization header and on the client that is allowed to authenticate with the requested scopes. If not, the request will fail. Regardless of the authentication outcome, Elements API responds in JSON format:

  • If authentication is successful, the received JSON object will include the access_token property. You can use this token for all subsequent API requests.
  • If authentication fails, the received JSON object will include an error explanation.
from json import *
from base64 import b64encode

import requests

API_URL = 'https://api.connect.withsecure.com' 

def authenticate(client_id, client_secret):
    auth_header = b64encode(bytes(client_id + ':' + client_secret, 'utf-8')).decode('utf-8')
    headers = {
        'Content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
        'Accept': 'application/json',
        'Authorization': 'Basic ' + auth_header,
        'User-Agent': 'my-script' # each request must contain User-Agent header
    }
    scopes = ['connect.api.read', 'connect.api.write'] # authenticated client can read
                                                       # and write data
    
    data = {'grant_type': 'client_credentials', 'scope': ' '.join(scopes)}
    
    response = requests.post(API_URL + '/as/token.oauth2', data=data, headers=headers)
    if response.ok:
        res_body = response.json() 
        return res_body['access_token']
    else:
        print('Response=' + response.text)
        print('Headers=' + str(response.headers))

        # Each response contains header `X-Transaction`. It can be send to Elements API
        # support team in order to help investigation of possible errors
        print('Transaction-id=' + response.headers.get('X-Transaction'))
        raise Exception('Authentication failed')

Listing all devices

A paging functionality is available in all Elements API endpoints that return extensive lists of items, such as device or incident listings. This allows you to specify the maximum number of items to be sent in the response. If the total number of items matching the query criteria exceeds the specified limit, Elements API includes a property called nextAnchor in the response. This nextAnchor contains a link to the next batch of items, referred to as the “next page”. If you include this value in the request parameters, Elements API will return the next page of items. You can repeat this process until nextAnchor is no longer present in the response.

In this recipe, the function list_all_devices() reads all devices of a specified organization that have a critical protection status. The function sets the limit to 10, which means that the response can contain up to 10 items in the list from the property response.items. The listing function ends its iteration when the value for the property response.nextAnchor is not found.

import requests
from authentication import authenticate

API_URL = "https://api.connect.withsecure.com"
DEVICES_PATH = "/devices/v1/devices"

def get_devices(auth_token,
                organization_id=None, next_page=None):
    headers = {
        "Accept": "application/json", # always use that header if you expect JSON response
        "Authorization": "Bearer " + auth_token,
        "User-Agent": "my-script"
    }

    params = {"limit": 10} # use `limit` to define size of single page
    
    params["protectionStatusOverview"] = "critical"
    # with parameter `protectionStatusOverview` we can read select devices with given
    # protection status.

    if organization_id:
        # `organization_id` parameter is optional. If authenticated client belongs to
        # partner organization, it can use `organization_id` to select devices, that
        # belong to given organization
        params["organizationId"] = organization_id
        
    if next_page:
        params["anchor"] = next_page # add `anchor` to query parameters to read next page
                                     # as value use property `nextAnchor` from last response
                                     # if `nextAnchor` is not present in response it means
                                     # that query returned last page

    response = requests.get(API_URL + DEVICES_PATH, params=params, headers=headers)

    if not response.ok:
        print("Error", response.text)
        sys.exit(0)

    body = response.json()
    items = body["items"] # if any device matching query is found then `items`
                              # list is empty

    for d in items:
        print("Device name={}, status={}, company={}".format(d["name"], d["protectionStatusOverview"],
                                                             d["company"]["name"]))    
    if "nextAnchor" in body:
        # if `nextAnchor` is present in response, client can use it to read next page
        # of query result. For example by calling function
        # get_devices_with_status(client_id, client_secret, next_page=response["nextAnchor"])
        print("Link to next page=", body["nextAnchor"])
        return body["nextAnchor"]
    else:
        # if `nextAnchor` is missing in response it means, that client received last page
        # of query results
        print("Last page")
        return None

def list_all_devices(client_id, client_secret, organization_id=None):
    auth_token = authenticate(client_id, client_secret)
    next_page = True
    anchor = None
    while next_page:
        anchor = get_devices(auth_token, organization_id, anchor)
        next_page = anchor is not None

Polling Security Events

To maintain an up-to-date list of continuously generated resources, such as EPP Security Events or EDR Incidents, you can use a technique called polling. Using request parameters to define the time range, you can periodically send a request to Elements API to retrieve all items that were created since the previous check. In response, Elements API returns only those items that were created or updated within the specified range.

The function poll_security_events() periodically retrieves all the events that were generated since the timestamp was saved in the variable last_date. When the function is invoked, this variable is initialized with the current time in the UTC time zone. If the function get_events_after() finds any EPP Security Event, the last_date variable is updated with the timestamp from the last event and is used in the next iteration.

The get_events_after() function converts last_date to a string in the following format: YYYY-MM-DDThh:mm:ss.SSS. This value is then included in the request as a parameter called persistenceTimestampStart. Furthermore, the function sets the exclusiveStart parameter to true, signifying that Elements API will return all events with a persistenceTimestamp greater than the value specified in persistenceTimestampStart.

You must always include the exclusiveStart parameter during polling to prevent duplicate items. If this parameter is omitted, Elements API will return all items with a persistenceTimestamp greater than or equal to the value specified in persistenceTimestampStart. As a result, you may receive the same event as in the previous iteration.

from datetime import datetime, timezone
from time import sleep

import requests

from authentication import authenticate

API_URL = 'https://api.connect.withsecure.com' 
EVENTS_PATH = "/security-events/v1/security-events"

def get_events_page(auth_token, min_date, org_id=None, next_page=None):
    headers = {
        "Accept": "application/json", # always use that header if you expect JSON response
        "Authorization": "Bearer " + auth_token,
        "User-Agent": "my-script",
        "Content-type": "application/x-www-form-urlencoded"
    }
    
    params = {"limit": 200,
              "engineGroup": ["epp", "edr"],
              "persistenceTimestampStart": min_date, # read all events created AFTER `min_date`
              "order": "asc", # ascending order
    }

    # `min_date` represents timestamp of last event in previous page
    # That event should not be included in next response. When `exclusiveStart=true` is set
    # then `persistenceTimestampStart` is used as exclusive start of time range.
    params["exclusiveStart"] = "true"

    if next_page:
        params['anchor'] = next_page

    if org_id:
        params["organizationId"] = org_id

    response = requests.post(API_URL + EVENTS_PATH, data=params, headers=headers)

    if not response.ok:
        print("Error", response.text)
        raise Exception("Request error")

    return response.json()
    
def get_events_after(auth_token, last_date, org_id):
    # Elements API accepts timestamps in [RFC 3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6)
    # format: YYYY-MM-DDThh:mm:ss.SSS
    last_date_str = last_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    next_page = None
    fetch_page = True
    print("Reading events created after {}".format(last_date_str))
    while fetch_page:
        page = get_events_page(auth_token, last_date_str, org_id, next_page)
        next_page = page.get("nextAnchor")

        # If `nextAnchor` is present we should fetch next page in next iteration
        fetch_page = next_page is not None
        for event in page["items"]:
            print("EventId={}, EventTs={}".format(event["id"],
                                                  event["persistenceTimestamp"]))
            # updating `last_date` with time when event was persisted
            # when iteration ends, `last_date` will be equal to `persistenceTimestamp`
            # of last received event
            last_date_str = event["persistenceTimestamp"]

    # Parse timestamp of last received event after reading all
    # events, that were created after `min_dt`
    return datetime.strptime(last_date_str, "%Y-%m-%dT%H:%M:%S.%fZ")

def poll_security_events(client_id, client_secret, poll_interval, org_id=None):
    # initialize start date with current timestamp
    last_date = datetime.now(timezone.utc)

    # start infinite loop
    while True:
        # obtain authentication token 
        auth_token = authenticate(client_id, client_secret)

        # read all events created after `last_date`. After reading all events
        # that variable is updated with timestamp of last events. `last_date` will be
        # used in next iteration
        last_date = get_events_after(auth_token, last_date, org_id)
        print("Last date", last_date)

        # execute next iteration every `poll_interval`
        sleep(poll_interval)

Count Security Events by engine

Except listing Security Events you can also request Elements API to produce data summary. API selects items that are matching parameters, groups them by chosen property and return number of items that belongs to each group. In response API sends list of rows where every item represents one aggregation group.

from datetime import datetime, timezone, timedelta
import requests
from authentication import authenticate

API_URL = "https://api.connect.withsecure.com"
PATH = "/security-events/v1/security-events"

def count_by_engine_last_week(client_id, client_secret, org_id):
    end = datetime.now(tz=timezone.utc)
    start = end - timedelta(days=7)
    stats = count_by_engine(client_id, client_secret, start, end, org_id)
    fmt = "|{}|{}|"
    print(fmt.format("Engine", "Events count"))
    for row in stats["items"]:
        print(fmt.format(row["engine"], row["count"]))

def count_by_engine(client_id, client_secret, start_dt, end_dt, org_id=None):
    auth_token = authenticate(client_id, client_secret)

    headers = {
        # when `Accept` header has value `application/vnd.withsecure.aggr+json`
        # then API calculates statistics for events matching query parameters
        "Accept": "application/vnd.withsecure.aggr+json",
        "Authorization": "Bearer " + auth_token,
        "User-Agent": "my-script",
        "Content-Type": "application/x-www-form-urlencoded",
    }

    param_start_dt = start_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    param_end_dt = end_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

    # API selects all security events from `epp` engine group that were persisted
    # between `start_dt` and `end_dt`, group items by value of `engine` property
    # and return number of items in each group
    params = {
        "engineGroup": ["epp"],
        "persistenceTimestampStart": param_start_dt,
        "persistenceTimestampEnd": param_end_dt,
        "count": "engine",  # property `engine` is used to group
    }

    if org_id:
        params["organizationId"] = org_id

    response = requests.post(API_URL + PATH, data=params, headers=headers)
    if not response.ok:
        print("Error", response.text)
        raise Exception("Request error")
    return response.json()

In recipe above function count_by_engine reads security events in given time range that is specified with query parameters persistenceTimestampStart and persistenceTimestampEnd which comes from any EPP engine. Request should include header Accept: "application/vnd.withsecure.aggr+json" to indicate that it expects data summary and uses parameter count: "engine" to instruct API to group security events by property engine.

Function count_by_engine_last_week gets response from API and iterate over list of items to produce table. Each row represents different EPP engine which name is visible in first column and number of events from that engine that is printed in second column.

Polling EDR detections

It is only possible to read a list of EDR detections, for a single EDR incident. To keep the list up-to-date, you have to iterate over the list of open incidents and for each item, read the list of related detections. A long list of incidents will lead to a high number of requests that are sent to Elements API. It may lead to request throttling when the frequency is too high.

To limit the risk of request throttling, we recommend that you query detection only for those incidents that were updated since the previous check. The incidents are mutable entities, which means that their state may change over time, for example, when the system receives a new detection or the status of an incident is updated. Elements API allows you to find incidents that were updated within a specified time range. After getting a list of recent updates, you can use it to fetch last detections.

The function poll_edr_detections() periodically calls the get_incidents_updated_after function to retrieve EDR incidents that were created after last_date. The last_date is initialized when the poll_detections() function is called and it gets updated whenever the get_incidents_updated_after function finds a new incident. Whenever a new incident is found, the function print_all_detections is triggered to read all the related detections using a paging technique.

from datetime import datetime, timezone
from time import sleep
import requests
from authentication import authenticate

API_URL = 'https://api.connect.withsecure.com' 
INCIDENTS_PATH = "/incidents/v1/incidents"
DETECTIONS_PATH = "/incidents/v1/detections"

def get_updated_incidents(auth_token, min_date, org_id=None, next_page=None):
    headers = {
        "Accept": "application/json", # always use that header if you expect JSON response
        "Authorization": "Bearer " + auth_token,
        "User-Agent": "my-script"
    }
    
    params = {
        "limit": 50,
        "archived": False, # read only not archived incidents
        "updatedTimestampStart": min_date, # read all incidents updated AFTER `min_date`
        "order": "asc", # ascending order by `updateTimestampStart`
    }

    # `min_date` represents timestamp of last event in previous page
    # That event should not be included in next response. When `exclusiveStart=true` is set
    # then `updatedTimestampStart` is used as exclusive start of time range.
    params["exclusiveStart"] = "true"

    if next_page:
        params['anchor'] = next_page

    if org_id:
        params["organizationId"] = org_id

    response = requests.get(API_URL + INCIDENTS_PATH, params=params, headers=headers)

    if not response.ok:
        print("Error", response.text)
        raise Exception("Request error")

    return response.json()

def get_new_detections_page(auth_token, incident_id, min_date, next_page=None):
    headers = {
        "Accept": "application/json",
        "Authorization": "Bearer " + auth_token,
        "User-Agent": "my-script"
    }
    
    params = {"limit": 100,
              "incidentId": incident_id 
    }

    if next_page:
        params['anchor'] = next_page
        
    response = requests.get(API_URL + DETECTIONS_PATH, params=params, headers=headers)

    if not response.ok:
        print("Error", response.text)
        raise Exception("Request error")

    return response.json()

def print_all_detections(auth_token, incident_id, min_date):
    next_page = None
    fetch_page = True
    while fetch_page:
        page = get_new_detections_page(auth_token, incident_id, min_date, next_page)
        next_page = page.get("nextAnchor")
        fetch_page = next_page is not None
        for det in page["items"]:
            print("DetectionId={}, IncidentId={}, Created={}".format(det["detectionId"],
                                                       det["incidentId"], det["createdTimestamp"]))

def get_incidents_updated_after(auth_token, min_dt, org_id):
    # Elements API accepts timestamps in [RFC 3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6)
    # format: YYYY-MM-DDThh:mm:ss.fff
    last_date = min_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    next_page = None
    fetch_page = True
    print("Reading incidents updated after {}".format(last_date))
    updated_incidents = []
    while fetch_page:
        page = get_updated_incidents(auth_token, last_date, org_id, next_page)
        next_page = page.get("nextAnchor")

        # If `nextAnchor` is present we should fetch next page in next iteration
        fetch_page = next_page is not None
        for incident in page["items"]:
            print("IncidentId={}, Updated Ts={}".format(incident["incidentId"],
                                                  incident["updatedTimestamp"]))
            # updating `last_date` with time when incident was updated
            # when iteration ends, `last_date` will be equal to `updatedTimestamp`
            # of last received incident
            last_date = incident["updatedTimestamp"]
            updated_incidents.append((incident["incidentId"], last_date))

    # Parse timestamp of last received incident after reading all
    # events, that were created after `min_dt`
    return {
        "last_date": datetime.strptime(last_date, "%Y-%m-%dT%H:%M:%S.%fZ"),
        "updated_incidents": updated_incidents
    }
        
def poll_edr_detections(client_id, client_secret, poll_interval, org_id=None):
    # initialize start date with current timestamp
    last_date = datetime.now(timezone.utc)

    # start infinite loop
    while True:
        # obtain authentication token 
        auth_token = authenticate(client_id, client_secret)

        # read all incidents created after `last_date`. After reading all items
        # that variable is updated with timestamp of last incident. `last_date` will be
        # used in next iteration
        updates = get_incidents_updated_after(auth_token, last_date, org_id)
        last_date = updates["last_date"]
        for incident in updates["updated_incidents"]:
            print_all_detections(auth_token, incident[0], incident[1])
        print("Last date", last_date)
        # execute next iteration every `poll_interval`
        sleep(poll_interval)

Next Steps

  1. Review the API Reference for more endpoints
  2. Check out the Getting Started Guide for basic concepts
  3. Join the Community for support