Chevron RightLinkChevron Right

REST API

Search

Link

Assuming that you have read Authentication Quickstart, you may start linking your company data with S&P Global's Market Intelligence or CapitalIQ data.

Concepts

Link API is a machine learning service that maps your entities to identifiers in S&P's knowledge bases. For example, given a company name in your database you can get a unique company identifier from S&P Capital IQ or S&P Capital IQ Pro database.

Entity Type: A real world concept like a company.

Entity: A single specific real world thing.

Copy
Example: "S&P" and "Kensho" are Entities both with an Entity Type of "Company"

Records:

Each Record is a collection of details currently known about an Entity. These details are used to find a corresponding Identifier in a S&P knowledge base.

The only required fields in a Record are uid and name. All fields outside of uid are features used by the model in predicting an Entity Link, with each feature weight determined by the modeI:

  • uid: A unique identifier for each input record. Used in the output to show which linked entity corresponds to which input record.
  • name: Text representing the name of the entity.
  • aliases: Other names the entity might go by
  • address or (address_1, address_2): One or more addresses associated with the entity.
  • city: A city associated with the entity.
  • state or state_abbreviation: A state associated with the entity.
  • zipcode: A zipcode associated with the entity.
  • country or country_iso3 or country_iso2: a country associated with the entity.
  • phone_number: a phone number associated with the entity.
  • url: a URL associated with the entity.
  • year_founded: the year the entity was founded.

Entity Link: Returned by the Link API. Contains the Identifier from the S&P knowledge base along with a Link Score. Optionally contains any additionally requested knowledge base information about the entity.

Score: The Link Score for a given Entity Link corresponds to the quality of the match to a specific Entity in a S&P knowledge base. It should not be interpreted as an estimated probability. However it can be used to rank multiple matches for a record.

The Link File endpoint supports requests for linking a .csv file of properly formamtted company information.

Sample Usage

Input File

Valid fields to be included in an input .csv file are as follows:

  • uid: str [required]
  • name: str [required]
  • aliases: Pipe-separated str
  • address or (address_1, address_2): str
  • city: str
  • state or state_abbreviation: str
  • zipcode: str
  • country or country_iso3 or country_iso2: str
  • phone_number: str
  • url: str
  • year_founded: int

The following is a sample input file:

Copy
uid,name,aliases,address,city,state_abbreviation,zipcode,country_iso3
1,Kensho Technologies,Kensho,44 Brattle St,Cambridge,MA,,USA
2,S&P Global,,55 Water St,New York,NY,10038,
3,Ratings,,,,,,USA
4,Apple,,,Cupertino,CA,,
5,Aramco,Saudi Aramco|Saudi Arabian Oil Company,,,,,SAU
6,Tencent,,,,,,CHN
7,Samsung,,,,,,KOR
8,Barclays Egypt,,,,,,
9,Roche,,,,,,
10,Toyota,,1 Toyota-Cho,Toyota City,,,JPN

Request

The following code snippet displays how to submit and download the output of a file linking request with Kensho's file linking client.

Copy
from datetime import datetime, timedelta
import logging
import io
import os
import tempfile
import time
from typing import Any, Dict, List, Tuple, Optional
import zipfile
import requests
logger = logging.getLogger(__name__)
# Polling statuses
JOB_SUCCESS = "SUCCESS"
JOB_ERROR = "ERROR"
POLLING_TIMEOUT = "TIMEOUT"
# Extra fields
NAME = "name"
ADDRESS = "address"
CITY = "city"
STATE = "state"
ZIPCODE = "zipcode"
COUNTRY = "country_iso3"
ALIASES = "aliases"
PHONE_NUMBER = "phone_number"
URL = "url"
YEAR_FOUNDED = "year_founded"
AVAILABLE_EXTRA_FIELDS = {
NAME,
ADDRESS,
CITY,
STATE,
ZIPCODE,
COUNTRY,
ALIASES,
PHONE_NUMBER,
URL,
YEAR_FOUNDED,
}
# Knowledge bases
MI = "mi"
CAPIQ = "capiq"
AVAILABLE_KNOWLEDGE_BASES = [MI, CAPIQ]
# Helper function to upload a file to AWS S3
def _upload_file(path_to_file: str, aws_info: Dict[str, Any]) -> None:
with tempfile.NamedTemporaryFile(mode="w+b") as file_out:
with zipfile.ZipFile(file_out, mode="w") as file_out_zip:
file_out_zip.write(path_to_file)
file_out.flush()
file_out.seek(0)
files = {"file": file_out}
requests.post(aws_info["url"], data=aws_info["fields"], files=files)
# Helper function to download the output file
def _download_output(response: requests.Response, output_directory_path: str) -> None:
with zipfile.ZipFile(io.BytesIO(response.content)) as zfile:
csv_filepath = zfile.namelist()[0]
with zfile.open(csv_filepath) as csv_file:
csv_filename = os.path.basename(csv_filepath)
download_location = os.path.join(output_directory_path, csv_filename)
csv_content = csv_file.read()
with open(download_location, "wb") as f:
f.write(csv_content)
# Helper function to catch and re-raise HTTP errors
def _catch_reraise_error(response: requests.models.Response) -> None:
try:
response.raise_for_status()
except requests.HTTPError as e:
if response.text:
raise requests.HTTPError(f"{e.args[0]}. Error Message: {response.text}") from None
else:
raise e from None
class LinkFileClient:
"""A class to call the Link File API that automatically refreshes tokens when needed."""
def __init__(self, link_host: str, refresh_token: str, access_token: Optional[str] = None):
self._link_host = link_host
self._refresh_token = refresh_token
self.access_token = access_token
logger.setLevel(logging.INFO)
def update_access_token(self):
response = requests.get(f"{self._link_host}/oauth2/refresh?refresh_token={self._refresh_token}")
if not response.ok:
raise RuntimeError(
"Something went wrong when trying to use Link. Is your refresh token correct?"
)
access_token = response.json()["access_token"]
self.access_token = access_token
def call_api(self, verb, *args, headers={}, **kwargs):
"""Call Link API, refreshing access token as needed."""
if not self.access_token:
self.update_access_token()
def call_with_updated_headers(request_method):
headers["Authorization"] = f"Bearer {self.access_token}"
return request_method(*args, headers=headers, **kwargs)
method = getattr(requests, verb)
response = call_with_updated_headers(method)
if response.status_code == 403:
self.update_access_token()
response = call_with_updated_headers(method)
return response
def start_job(
self,
job_name: str,
path_to_file: str,
knowledge_base: str,
extra_fields: Optional[List[str]] = None,
country_filter: bool = False,
num_top_records: int = 1,
include_identifiers: Optional[List[str]] = None,
get_ultimate_parent: bool = False,
include_input_fields: bool = False,
include_score_label: bool = False,
) -> Optional[str]:
"""Start a new linking job.
Args:
job_name: Name given to the job.
path_to_file: Path to the input csv file.
extra_fields: Extra response fields to output.
country_filter: Whether to filter by a given country.
knowledge_base: Name of knowledge base to link against.
num_top_records: Maximum number of top records for each link.
include_identifiers: Cross-reference ID types to output.
get_ultimate_parent: Whether to fetch the Ultimate Parent Company.
include_input_fields: Whether to include input fields in output file.
Returns:
UUID of the job if it is started successfully otherwise None.
"""
extra_fields = extra_fields or []
if not set(extra_fields).issubset(AVAILABLE_EXTRA_FIELDS):
raise ValueError(
f"Extra fields {set(extra_fields)-AVAILABLE_EXTRA_FIELDS} not allowed."
)
if knowledge_base not in AVAILABLE_KNOWLEDGE_BASES:
raise ValueError(
"Invalid knowledge base. "
f"Knowledge base should be one of {AVAILABLE_KNOWLEDGE_BASES}"
)
file_name = os.path.basename(path_to_file)
create_job_request_data = {
"job_name": job_name,
"file_name": file_name,
"knowledge_base": knowledge_base,
"model_name": "generic",
"num_top_records": num_top_records,
"country_filter": country_filter,
"extra_fields": extra_fields,
"include_identifiers": include_identifiers,
"get_ultimate_parent": get_ultimate_parent,
"include_input_fields": include_input_fields,
}
logger.info("Creating job with following params: %s", str(create_job_request_data))
create_response = self.call_api(
"post",
f"{self._link_host}/api/linkfile/v0/create",
json=create_job_request_data,
headers={"Content-Type": "application/json"}
)
_catch_reraise_error(create_response)
job_info = create_response.json()
job_id = job_info["job_id"]
logger.info("Uploading file: %s", path_to_file)
aws_info = job_info["aws_upload_url"]
_upload_file(path_to_file, aws_info)
logger.info("Starting linking job with id: %s", job_id)
start_response = self.call_api(
"get",
f"{self._link_host}/api/linkfile/v0/start/{job_id}",
headers={"Content-Type": "application/json"}
)
_catch_reraise_error(start_response)
return job_id
def get_status(self, job_id: str) -> Dict[str, Any]:
"""Get status of the job.
Args:
job_id: UUID of the job.
Returns:
A dict containing the current status as well as other information about the job.
"""
response = self.call_api(
"get",
f"{self._link_host}/api/linkfile/v0/status/job/{job_id}",
headers={"Content-Type": "application/json"}
)
_catch_reraise_error(response)
return response.json()
def poll_for_completion(
self, job_id: str, interval: float = 30.0, timeout: Optional[float] = None
) -> Tuple[str, str]:
"""Poll the server until the job is complete.
Args:
job_id: UUID of the job
interval: Polling interval
timeout: Timeout for when the polling loop exits.
"""
poll_timeout_time = datetime.now() + timedelta(seconds=timeout) if timeout else None
while True:
status = self.get_status(job_id)
logger.info("%s Current status: %s", datetime.now(), status["status"])
if status["status"] == JOB_SUCCESS:
logger.info("Job completed successfully. Output file is ready to be downloaded.")
return JOB_SUCCESS, None
elif status["status"] == JOB_ERROR:
logger.info("Job failed.")
return JOB_ERROR, status["message"]
time.sleep(interval)
if poll_timeout_time is not None and datetime.now() >= poll_timeout_time:
logger.info("Polling timed out")
return POLLING_TIMEOUT, None
def download_job_output(self, job_id: str, output_directory_path: str) -> None:
"""Download the output of the linking job.
Args:
job_id: UUID of the job.
output_directory_path: Path to the directory where the output file should be downloaded.
"""
download_response = self.call_api(
"get",
f"{self._link_host}/api/linkfile/v0/download-url/{job_id}",
headers={"Content-Type": "application/json"}
)
_catch_reraise_error(download_response)
download_response_json = download_response.json()
presigned_url = download_response_json["file_path"]
response = requests.get(presigned_url)
_download_output(response, output_directory_path)
logger.info("Output file has been successfully downloaded.")
def ping_server(self) -> None:
"""Ping the server to test the readiness of the server."""
response = requests.get(f"{self._link_host}/statusz")
response.raise_for_status()
logger.info("Link server at %s is ready to receive requests.", self._link_host)
# Endpoint and auth info
LINK_HOST = "https://api.link.kensho.com"
ACCESS_TOKEN = "<token obtained from login>"
REFRESH_TOKEN = "<token obtained from login>"
# Input file and output directory paths
input_file_path = "path_to_input_csv_file"
output_directory_path = "path_to_output_directory"
logging.basicConfig(level=logging.INFO)
client = LinkFileClient(link_host=LINK_HOST, refresh_token=REFRESH_TOKEN, access_token=ACCESS_TOKEN)
job_id = client.start_job("Test Job", input_file_path, CAPIQ, extra_fields=[NAME], country_filter=False, num_top_records=1, include_identifiers=[], get_ultimate_parent=False, include_input_fields=False)
final_status, error_msg = client.poll_for_completion(job_id)
if final_status == JOB_SUCCESS:
client.download_job_output(job_id, output_directory_path)
elif final_status == JOB_ERROR:
logger.info("Error: %s", error_msg)

Output File

Below is the expected output file with the sample file as input and with the file linking job started as done in the above code snippet.

Copy
uid,sp_company_id,link_score,input_name,name
1,251994106,99.96,Kensho Technologies,"Kensho Technologies, Inc."
2,21719,99.41,S&P Global,S&P Global Inc.
3,7642076,22.04,Ratings,S&P Global Ratings Inc.
4,24937,99.69,Apple,Apple Inc.
5,1241120,99.07,Aramco,Saudi Arabian Oil Company
6,11042136,99.77,Tencent,Tencent Holdings Limited
7,91868,98.44,Samsung,"Samsung Electronics Co., Ltd."
8,13401047,97.14,Barclays Egypt,Attijariwafa bank Egypt S.A.E.
9,687140,99.39,Roche,Roche Holding AG
10,319676,98.65,Toyota,Toyota Motor Corporation

On-Demand

The following endpoint can be used for matching company data on-demand:

  • /api/ondemand/v0/companies/<knowledge_base>/<model_name>

This on-demand endpoint asks the user to provide a knowledge base and model name in the URI.

  • knowledge_base is either "mi" or "capiq".
    • "mi" allows the user to link against the Market Intelligence dataset
    • "capiq" allows the user to link against the CapitalIQ dataset.
  • model_name (at this time) must be "generic".

A JSON request body should be provided with the following format:

Copy
{
"num_top_records": Optional[int] = 1,
"records": [{
"uid": str,
"name": str,
"aliases": Optional[List[str]] = [],
"country_iso3": Optional[str] = None,
"address": Optional[str] = None,
"state": Optional[str] = None,
"city": Optional[str] = None,
"zipcode": Optional[str] = None,
"year_founded": Optional[int] = None,
"url": Optional[str] = None,
"phone_number": Optional[str] = None,
}]
}

Note: Unlike the link file endpoint, the on-demand endpoint only accepts the country_iso3 field. It will ignore any country or country_iso2 fields.

Sample usage

Request

The following example links companies to the CapitalIQ dataset with the "generic" model.

The URI indicates the knowledge base (capiq) and model name (generic):

Copy
https://api.link.kensho.com/api/ondemand/v0/companies/capiq/generic

The JSON body requests links for two companies, "S&P Global Inc." and "kensho". num_top_records indicates how many records to return for each company. For "S&P Global Inc.", all possible fields are provided, whereas for "kensho" only the required fields (name and uid) are provided.

Copy
{
"records": [
{
"uid": "1",
"name": "S&P Global",
"aliases": ["S&P", "SPGI"],
"country_iso3": "USA",
"address": "55 Water Street",
"state": "New York",
"city": "New York City",
"zipcode": "10041",
"year_founded": "1860",
"url": "www.spglobal.com",
"phone_number": "(212) 438-1000"
}
],
"include_response_fields": ["name"],
"num_top_records": 1
}

Response

The output of the endpoint contains metadata that describes the entity type, knowledge base, model name and model version used for the request. The output for each linked record contains the S&P knowledge base ID and the associated link score. The results for each record are returned in descending order of the link_score field.

Copy
{
"entity_type": "companies",
"knowledge_base": "capiq",
"model_name": "generic",
"records": [
{
"input_name": "S&P Global",
"links": [
{
"name": "S&P Global Inc.",
"sp_company_id": "21719",
"link_score": 0.991182643610278,
}
],
"num_links": 1,
"uid": "1"
}
]
}

The following is an example of how to interact with the API using Python:

Copy
import json
import requests
LINK_URL = 'https://api.link.kensho.com/api/ondemand/v0/companies/capiq/generic'
request_json = {
"records": [
{
"uid": "1",
"name": "S&P Global",
"aliases": ["S&P", "SPGI"],
"country_iso3": "USA",
"address": "55 Water Street",
"state": "New York",
"city": "New York City",
"zipcode": "10041",
"year_founded": "1860",
"url": "www.spglobal.com",
"phone_number": "(212) 438-1000"
}
],
"include_response_fields": ["name"],
"num_top_records": 1
}
response = requests.post(
LINK_URL,
json=request_json,
headers={'Content-Type': 'application/json',
'Authorization': 'Bearer <token obtained from login>'}
)
linked_results = response.json()

BECRS Cross-reference ID Lookup

If the Requestor has BECRS entitlements they can add include_identifiers to the request. Link will perform the cross-reference lookup through BECRS and add the results to the response. If include_identifiers is added to the request, the ultimate parent information for each company may also be fetched. This can be toggled through the get_ultimate_parent boolean field.

The following example response requests the DUNS and SNL identifier types as well as the ultimate parent company info:

Copy
{
"num_top_records": 1,
"records": [
{
"uid": "1",
"name": "Kensho"
}
],
"include_response_fields": ["name"],
"include_identifiers": ["DUNS", "SNL"],
"get_ultimate_parent": true
}

The corresponding response will have fields for the requested identifiers added to the response:

Copy
{
"entity_type": "companies",
"knowledge_base": "capiq",
"model_name": "generic",
"records": [
{
"input_name": "Kensho",
"links": [
{
"name": "Kensho Technologies, Inc.",
"sp_company_id": "251994106",
"link_score": 0.9933911614820168,
"DUNS": ["079246675"],
"SNL": ["5269941"],
"ultimate_parent_company": {
"name": "S&P Global Inc.",
"id": "21719"
}
}
],
"num_links": 1,
"uid": "1"
}
]
}

Note: The cross-reference ID fields are lists of IDs since BECRS can return multiple cross-reference IDs for the same knowledge base ID. In addition, BECRS cross-reference lookups are currently only available when linking with the CapIQ knowledge base.

On-Demand is limited to 100 records in a single request. There are plans to support a larger number of records in the future. For now the following snippet can be used to convert the JSON body of a On-Demand request to an input file usable by the File Link endpoint.

Copy
import csv
import json
import uuid
from pathlib import Path
def write_ondemand_body_to_csv(json_body: str, output_directory: str) -> str:
"""Converts the JSON for an on-demand request to a file usable the with File Linking Endpoint
Args:
json_body: the POST body of the on-demand endpoint
output_directory: the directory to write the output folder too
Returns:
the concrete path for the file that was created
"""
raw_json = json.loads(json_body)
filename = f"records-{uuid.uuid4()}.csv"
concrete_path = Path(output_directory) / filename
fieldnames = "uid", "name", "aliases", "country_iso3", "address", "state", "city", "zipcode", "year_founded", "url", "phone_number"
with concrete_path.open('w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for record in raw_json['records']:
if "aliases" in record:
record["aliases"] = "|".join(record["aliases"])
writer.writerow(record)
return str(concrete_path)

Note that the snippet does not check that the directory exists or if the json is formatted correctly.