This article will describe each authentication API and demonstrate how developers can utilize their functionality in a brief code example.
All owner-level API requests in the /rest section of the API documentation require session authentication with a fresh bearer token to enforce password checks before executing API requests. Please review the VMS Server API documentation for more information on how to properly utilize parameters and execute the API requests.
HTTP Bearer/Session Token Authentication
Nx Witness 5.0 introduced HTTP Bearer/Session Token Authentication, an improved authentication method recommended over the deprecated methods from prior versions of Nx Witness.
A bearer token is a cryptic string generated by Nx Witness Server in response to a login request. An Nx Witness client must send this token in the authorization header when making requests to protected resources. This method can only be used over HTTPS or RTSPS. Raw HTTP or RTSP requests cannot be made.
Local and LDAP Users
The following steps demonstrate how to authenticate API requests for Local and LDAP users:
- [Optional] Check user type on the VMS Server to confirm they are not a Cloud user.
- Execute a login request to the VMS Server to obtain two bearer tokens: one to initiate the API call and another to delete the first. For cookie-based authentication, setCookie should be set to true. Afterward, all web browser requests to the VMS Server will be authorized automatically.
- [Optional] Check if the bearer token is valid on the VMS Server.
- Execute any request which requires authentication with a bearer token on the VMS Server.
Note: Requests involving Video Walls require the Video Wall ID in the request. See the API documentation for more details. - Terminate a session that is no longer needed on the VMS Server.
Click to see the code example
import requests
from pprint import pprint
LOCAL_USER = 'admin' # local account username
LOCAL_PASSWORD = 'pass123' # local account password
LOCAL_URL = 'https://localhost:7001' # https://<server_ip>:<sever_port> or https://{system_id}.relay.vmsproxy.com
def check_status(request, verbose):
if request.status_code == requests.codes.ok:
if verbose:
print("Request successful\n{0}".format(request.text))
return True
print(request.url + " Request error {0}\n{1}".format(request.status_code, request.text))
return False
def request_api(url, uri, method, **kwargs):
server_url = f'{url}{uri}'
response = requests.request(
method,
server_url,
**kwargs
)
if not check_status(response, False):
exit(1)
if method == 'DELETE':
return response
return response.json()
def create_header(bearer_token):
header = {"Authorization": f"Bearer {bearer_token}"}
return header
def print_system_info(response):
if 'reply' in response:
system_info = response['reply']
number_of_servers = len(system_info)
system_name = system_info[0]['systemName']
else:
system_info = response
number_of_servers = len(system_info)
system_name = system_info[0]['systemName']
print(f'System {system_name} contains {number_of_servers} server(s):')
pprint(system_info)
def create_local_payload():
payload = {
'username': LOCAL_USER,
'password': LOCAL_PASSWORD,
'setCookie': False
}
return payload
def is_local_user(api_response):
if api_response['username'] == 'admin':
return True
elif api_response['type'] == 'cloud':
return False
def get_cloud_system_id(api_response):
cloud_system_id = api_response['cloudId']
return cloud_system_id
def main():
# STEP 1
cloud_state = request_api(LOCAL_URL, f'/rest/v1/login/users/{LOCAL_USER}', 'GET', verify=False)
if not is_local_user(cloud_state):
print(LOCAL_USER + ' is not a local user.')
exit(1)
# STEP 2
payload = create_local_payload()
primary_session = request_api(LOCAL_URL, '/rest/v1/login/sessions', 'POST', verify=False, json=payload)
primary_token = primary_session['token']
secondary_session = request_api(LOCAL_URL, '/rest/v1/login/sessions', 'POST', verify=False, json=payload)
secondary_token = secondary_session['token']
# STEP 3
primary_token_info = request_api(LOCAL_URL, f'/rest/v1/login/sessions/{primary_token}', 'GET', verify=False)
if is_expired(primary_token_info):
print('Expired token')
exit(1)
secondary_token_info = request_api(LOCAL_URL, f'/rest/v1/login/sessions/{secondary_token}', 'GET', verify=False)
if is_expired(secondary_token_info):
print('Expired token')
exit(1)
# STEP 4
get_method_header = create_header(primary_token)
system_info = request_api(LOCAL_URL, f'/rest/v1/servers/*/info', 'GET', verify=False,
headers=get_method_header)
print_system_info(system_info)
# STEP 5
delete_method_header = create_header(secondary_token)
request_api(LOCAL_URL, f'/rest/v1/login/sessions/{secondary_token}', 'DELETE', verify=False,
headers=delete_method_header)
if __name__ == '__main__':
main()
You can also find this example in the GitHub repository.
Cloud Users
The following steps demonstrate how to authenticate API requests for Cloud users:
- Check if the system is connected to the Cloud and get its Cloud System ID from the VMS Server. If the Cloud System ID is not listed, the system is not connected to the Cloud and does not have any Cloud users.
- [Optional] Check the user type on the VMS Server. Only Cloud users may use Cloud Sessions.
Note: Field methods will not contain sessions because the VMS Server does not give sessions for Cloud users. - Cloud users must obtain two bearer tokens from the Cloud: one to initiate the API calls and another to delete the first token when finished.
- [Optional] Check if the session token is valid on the VMS Server. For cookie-based authentication, add the ?setCookie=true URL parameter. Afterward, all web browser requests to the VMS Server will be authorized automatically.
- Execute any request which requires authentication with a bearer token on the VMS Server.
- Cloud sessions can be terminated on the Cloud when it's no longer needed.
Click to see the code example
import requests
from pprint import pprint
LOCAL_USER = 'admin' # local account username
LOCAL_PASSWORD = 'pass123' # local account password
LOCAL_URL = 'https://localhost:7001' # https://<server_ip>:<sever_port> or https://{system_id}.relay.vmsproxy.com
CLOUD_USER = 'user@gmail.com' # cloud account email
CLOUD_PASSWORD = 'pass123' # cloud account password
CLOUD_DOMAIN_NAME = 'nxvms.com' # cloud service domain name
CLOUD_URL = 'https://' + CLOUD_DOMAIN_NAME
def check_status(request, verbose):
if request.status_code == requests.codes.ok:
if verbose:
print("Request successful\n{0}".format(request.text))
return True
print(request.url + " Request error {0}\n{1}".format(request.status_code, request.text))
return False
def request_api(url, uri, method, **kwargs):
server_url = f'{url}{uri}'
response = requests.request(
method,
server_url,
**kwargs
)
if not check_status(response, False):
exit(1)
if method == 'DELETE':
return response
return response.json()
def create_header(bearer_token):
header = {"Authorization": f"Bearer {bearer_token}"}
return header
def print_system_info(response):
if 'reply' in response:
system_info = response['reply']
number_of_servers = len(system_info)
system_name = system_info[0]['systemName']
else:
system_info = response
number_of_servers = len(system_info)
system_name = system_info[0]['systemName']
print(f'System {system_name} contains {number_of_servers} server(s):')
pprint(system_info)
def create_cloud_payload(cloud_system_id):
return {
'grant_type': 'password', 'response_type': 'token', 'client_id': '3rdParty',
'scope': f'{CLOUD_URL}/cdb/oauth2/token cloudSystemId={cloud_system_id}',
'username': CLOUD_USER, 'password': CLOUD_PASSWORD
}
def get_cloud_system_id(api_response):
cloud_system_id = api_response['cloudId']
return cloud_system_id
def has_cloud_host(api_response):
return api_response['cloudHost'] == CLOUD_DOMAIN_NAME
def is_cloud_user(api_response):
if api_response['type'] != 'cloud':
return False
else:
return True
def get_token(api_response):
if int(api_response['expires_in']) < 1:
print('Expired token')
exit(1)
bearer_token = api_response['access_token']
return bearer_token
def is_expired(api_response):
if int(api_response['expiresInS']) < 1:
return True
else:
return False
def main():
# STEP 1
system_info = request_api(LOCAL_URL, '/rest/v1/system/info', 'GET', verify=False)
if not has_cloud_host(system_info):
print('Not a cloud-connected system')
exit(1)
cloud_system_id = get_cloud_system_id(system_info)
# STEP 2
user_info = request_api(LOCAL_URL, f'/rest/v1/login/users/{CLOUD_USER}', 'GET', verify=False)
if not is_cloud_user(user_info):
print(CLOUD_USER + ' is not a cloud user.')
exit(1)
# STEP 3
oauth_payload = create_cloud_payload(cloud_system_id)
oath_response = request_api(CLOUD_URL, f'/cdb/oauth2/token', 'POST', json=oauth_payload)
primary_token = get_token(oath_response)
oauth_payload = create_cloud_payload(cloud_system_id)
oath_response = request_api(CLOUD_URL, f'/cdb/oauth2/token', 'POST', json=oauth_payload)
secondary_token = get_token(oath_response)
# STEP 4
token_info = request_api(LOCAL_URL, f'/rest/v1/login/sessions/{primary_token}', 'GET', verify=False)
if is_expired(token_info):
print('Expired token')
exit(1)
# STEP 5
primary_token_header = create_header(primary_token)
system_info = request_api(LOCAL_URL, f'/rest/v1/servers/*/info', 'GET',
headers=primary_token_header, verify=False)
print_system_info(system_info)
secondary_token_header = create_header(secondary_token)
# STEP 6
request_api(CLOUD_URL, f'/cdb/oauth2/token/{primary_token}', 'DELETE', headers=secondary_token_header)
if __name__ == '__main__':
main()
You can also find this example in the GitHub repository.
Deprecated Authentication in Nx Witness
HTTP Basic/Digest and URL-based authentication can only be used if they have been enabled for certain users. API requests that require the owner’s password do not work with the deprecated authentication methods.
The owner can check if the deprecated methods are enabled for a user:
https://localhost:7001/rest/v1/login/users/{username}
To enable digest authentication for a user, open User Management and click on a user:
- Click the more options icon
- Click Allow digest authentication for this user.
Please review the VMS Server API documentation for more information on how to properly utilize parameters and execute the API requests.
HTTP Basic and Digest Authentication
Basic authentication uses the username and password of a VMS Server account (unencrypted), while Digest authentication uses the digest parameter (an MD5 hash of the username, realm, and password).
The following code example uses HTTP Digest authentication but can be changed to use HTTP Basic for compatible API requests by replacing auth=HTTPDigestAuth
with auth=HTTPBasicAuth
. The digest is calculated and passed as an argument to the API request.
Local and LDAP Users
For Local and LDAP users, define the global variables according to your local server’s URL and account credentials.
Click to see the code example
import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from pprint import pprint
USERNAME = 'admin' # local account username
PASSWORD = 'pass123' # local account password
URL = 'https://localhost:7001' # https://<server_ip>:<sever_port>
URI = '/api/moduleInformation?allModules=true' # API request URI
METHOD = 'GET' # API request method
def check_status(request, verbose):
if request.status_code == requests.codes.ok:
if verbose:
print("Request successful\n{0}".format(request.text))
return True
print(request.url + " Request error {0}\n{1}".format(request.status_code, request.text))
return False
def request_api(url, uri, method, **kwargs):
server_url = f'{url}{uri}'
response = requests.request(method, server_url, **kwargs)
if not check_status(response, False):
exit(1)
return response.json()
def print_system_info(response):
response_type = type(response['reply'])
system_info = response['reply']
if response_type == list:
number_of_servers = len(system_info)
system_name = system_info[0]['systemName']
else:
number_of_servers = 1
system_name = system_info['systemName']
print(f'System {system_name} contains {number_of_servers} server(s):')
pprint(response)
def main():
system_info = request_api(URL, URI, METHOD, auth=HTTPDigestAuth(USERNAME, PASSWORD), verify=False)
print_system_info(system_info)
if __name__ == '__main__':
main()
You can also find this example in the GitHub repository.
Cloud Users
Use the above code example for reference, as it can be reused with only a few changes to the global variables. For cloud users, define CLOUD_USER
and CLOUD_PASSWORD
according to your cloud account credentials.
CLOUD_USER = 'user@gmail.com' # cloud account email
CLOUD_PASSWORD = 'pass123' # cloud account password
For Cloud API requests, CLOUD_URL
should be defined as your cloud portal domain and URI
as an appropriate cloud API URI.
CLOUD_URL = 'https://nxvms.com' # cloud service URL
URI = '/cdb/system/get' # API request URI
METHOD = 'GET' # API request method
For Local server API requests routed via the Cloud, CLOUD_URL
should be defined as a URL containing your CloudSystemID and cloud relay service URL and appropriate local API request. URI
should be defined as an appropriate Local API URI.
CLOUD_URL = 'https://{system_id}.relay.vmsproxy.com' # CloudSystemID via cloud relay
URI = '/api/moduleInformation?allModules=true' # API request URI
METHOD = 'GET' # API request method
You can also find this example in the GitHub repository.
URL-based authentication
This method uses credentials passed as part of the request URL. It should only be used if other methods are not available for your purpose (e.g., WebM direct link).
Create the auth_digest parameter by generating multiple MD5 hashes using the required info and base64 encoding the final result:
- digest = md5_hex(user_name + ":" + realm + ":" + password)
- partial_ha2 = md5_hex(method + ":")
- simplified_ha2 = md5_hex(digest + ":" + nonce + ":" + partial_ha2)
- auth_digest = base64(user_name + ":" + nonce + ":" + simplified_ha2)
Note: user_name must be in lowercase.
The resulting value is passed as an argument to the API request and appended to the URL.
Click to see the code example
import requests
import hashlib
import base64
from pprint import pprint
USERNAME = 'admin' # local account username or cloud email
PASSWORD = 'pass123' # local or cloud account password
URL = 'https://localhost:7001' # https://<server_ip>:<sever_port>
URI = '/api/moduleInformation?allModules=true' # API request URI
METHOD = 'GET' # API request method
def md5(data):
m = hashlib.md5()
m.update(data.encode())
return m.hexdigest()
def digest(login, password, realm, nonce, method):
login = login.lower()
dig = md5(f"{login}:{realm}:{password}")
method = md5(f"{method}:")
auth_digest = md5(f"{dig}:{nonce}:{method}")
auth = f"{login}:{nonce}:{auth_digest}".encode()
return base64.b64encode(auth)
def check_status(request, verbose):
if request.status_code == requests.codes.ok:
if verbose:
print("Request successful\n{0}".format(request.text))
return True
print(request.url + " Request error {0}\n{1}".format(request.status_code, request.text))
return False
def create_auth():
response = request_api(URL, '/api/getNonce', METHOD, verify=False)
realm = response['reply']['realm']
nonce = response['reply']['nonce']
auth = str(digest(USERNAME, PASSWORD, realm, nonce, METHOD), 'utf-8')
return f'?auth={auth}'
def request_api(url, uri, method, auth='', **kwargs):
server_url = f'{url}{uri}{auth}'
response = requests.request(method, server_url, **kwargs)
if not check_status(response, False):
exit(1)
return response.json()
def print_system_info(response):
response_type = type(response['reply'])
system_info = response['reply']
if response_type == list:
number_of_servers = len(system_info)
system_name = system_info[0]['systemName']
else:
number_of_servers = 1
system_name = system_info['systemName']
print(f'System {system_name} contains {number_of_servers} server(s):')
pprint(response)
def main():
system_info = request_api(URL, URI, METHOD, create_auth(), verify=False)
print_system_info(system_info)
if __name__ == '__main__':
main()
You can also find this example in the GitHub repository.
Comments
0 comments
Article is closed for comments.