#!/usr/bin/env python3 # # Copyright 2019-2020, Mischa Peters , Netskope. # Netskope_ZScalerImporter.py - Version 2.0 - 20200611 # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # # ZScaler integration with Netskope # import os import sys import re import json import csv import time import logging import urllib.parse import requests import configparser ############################################### CONFIG_FILE = "/home/mischa/netskope/netskope.cnf" if not os.path.isfile(CONFIG_FILE): logging.error(f"The config file {CONFIG_FILE} doesn't exist") sys.exit(1) config = configparser.RawConfigParser() config.read(CONFIG_FILE) NTSKP_TENANT = config.get('netskope', 'NTSKP_TENANT') NTSKP_TOKEN = config.get('netskope', 'NTSKP_TOKEN') ZS_MAX_DOMAINS = int(config.get('zscaler', 'ZS_MAX_DOMAINS')) ZS_BASE_URI = config.get('zscaler', 'ZS_BASE_URI') ZS_API_KEY = config.get('zscaler', 'ZS_API_KEY') ZS_API_USERNAME = config.get('zscaler', 'ZS_API_USERNAME') ZS_API_PASSWORD = config.get('zscaler', 'ZS_API_PASSWORD') ZS_CATEGORY_NAME = config.get('zscaler', 'ZS_CATEGORY_NAME') ZS_CATEGORY_DESC = config.get('zscaler', 'ZS_CATEGORY_DESC') PROXY = config.get('general', 'PROXY') ############################################### # Use a custom user-agent string UA_STRING = 'Netskope_ZScalerImporter1.0' # Set logging.INFO to logging.DEBUG for debug information logging.basicConfig(level=logging.DEBUG) logging = logging.getLogger('Netskope_ZScalerImporter') def ntskp_get_domains(): ioc_list = [] with open('zscaler.txt') as f: ioc_list = f.read().splitlines() logging.debug(ioc_list[:ZS_MAX_DOMAINS]) return ioc_list[:ZS_MAX_DOMAINS] def zs_auth(headers): # Authenticatie against ZScaler API, fetch and return JSESSIONID now = int(time.time() * 1000) n = str(now)[-6:] r = str(int(n) >> 1).zfill(6) key = "" for i in range(0, len(str(n)), 1): key += ZS_API_KEY[int(str(n)[i])] for j in range(0, len(str(r)), 1): key += ZS_API_KEY[int(str(r)[j])+2] uri = f'{ZS_BASE_URI}/authenticatedSession' body = {'apiKey': key, 'username': ZS_API_USERNAME, 'password': ZS_API_PASSWORD, 'timestamp': now} try: r = requests.post(uri, data=json.dumps(body), headers=headers, proxies=PROXY) r.raise_for_status() jsessionid = re.sub(r';.*$', "", r.headers['Set-Cookie']) except Exception as e: logging.error(f'Error: {str(e)}') sys.exit(1) return jsessionid def zs_get_categories(headers): # Find any existing categories matching ZS_CATEGORY_NAME uri = f'{ZS_BASE_URI}/urlCategories/lite' try: r = requests.get(uri, headers=headers, proxies=PROXY) r.raise_for_status() except Exception as e: logging.error(f'Error: {str(e)}') sys.exit(1) data = r.json() for item in data: if item.get('configuredName') == ZS_CATEGORY_NAME: return item.get('id') return None def zs_update_categories(headers, domains, id = None): # Update the ZS_CATEGORY_NAME with blocklist from Netskope description = f'{ZS_CATEGORY_DESC}\n\nLast Updated: {str(time.ctime(int(time.time())))}' body = {'configuredName': ZS_CATEGORY_NAME, 'customCategory': 'true', 'superCategory': 'SECURITY', 'urls': domains, 'description': description} try: if id == None: uri = f'{ZS_BASE_URI}/urlCategories' r = requests.post(uri, json=body, headers=headers, proxies=PROXY) else: uri = f'{ZS_BASE_URI}/urlCategories/{str(id)}' r = requests.put(uri, json=body, headers=headers, proxies=PROXY) r.raise_for_status() except Exception as e: logging.error(f'Error: {str(e)}') sys.exit(1) return None def zs_logout(headers): # Logout from ZScaler uri = f'{ZS_BASE_URI}/authenticatedSession' try: r = requests.delete(uri, headers=headers, proxies=PROXY) r.raise_for_status() except Exception as e: logging.error(f'Error: {str(e)}') sys.exit(1) return None ############################################## request_headers = {'Content-Type': 'application/json', 'Cache-Control': 'no-cache', 'User-Agent': UA_STRING} domains = ntskp_get_domains() request_headers['Cookie'] = zs_auth(request_headers) zs_update_categories(request_headers, domains, zs_get_categories(request_headers)) zs_logout(request_headers) logging.info(f'Netskope added {str(len(domains))} domains added to ZScaler custom URL category {ZS_CATEGORY_NAME}')