netskope/Netskope_ZScalerImporter-01.py

146 lines
4.8 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright 2019-2020, Mischa Peters <mischa AT netskope DOT com>, Netskope.
# Netskope_ZScalerImporter.py - Version 2.0 - 20200611
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# ZScaler integration with Netskope
#
import os
import sys
import re
import json
import csv
import time
import logging
import urllib.parse
import requests
import configparser
###############################################
CONFIG_FILE = "/home/mischa/netskope/netskope.cnf"
if not os.path.isfile(CONFIG_FILE):
logging.error(f"The config file {CONFIG_FILE} doesn't exist")
sys.exit(1)
config = configparser.RawConfigParser()
config.read(CONFIG_FILE)
NTSKP_TENANT = config.get('netskope', 'NTSKP_TENANT')
NTSKP_TOKEN = config.get('netskope', 'NTSKP_TOKEN')
ZS_MAX_DOMAINS = int(config.get('zscaler', 'ZS_MAX_DOMAINS'))
ZS_BASE_URI = config.get('zscaler', 'ZS_BASE_URI')
ZS_API_KEY = config.get('zscaler', 'ZS_API_KEY')
ZS_API_USERNAME = config.get('zscaler', 'ZS_API_USERNAME')
ZS_API_PASSWORD = config.get('zscaler', 'ZS_API_PASSWORD')
ZS_CATEGORY_NAME = config.get('zscaler', 'ZS_CATEGORY_NAME')
ZS_CATEGORY_DESC = config.get('zscaler', 'ZS_CATEGORY_DESC')
PROXY = config.get('general', 'PROXY')
###############################################
# Use a custom user-agent string
UA_STRING = 'Netskope_ZScalerImporter1.0'
# Set logging.INFO to logging.DEBUG for debug information
logging.basicConfig(level=logging.DEBUG)
logging = logging.getLogger('Netskope_ZScalerImporter')
def ntskp_get_domains():
ioc_list = []
with open('zscaler.txt') as f:
ioc_list = f.read().splitlines()
logging.debug(ioc_list[:ZS_MAX_DOMAINS])
return ioc_list[:ZS_MAX_DOMAINS]
def zs_auth(headers):
# Authenticatie against ZScaler API, fetch and return JSESSIONID
now = int(time.time() * 1000)
n = str(now)[-6:]
r = str(int(n) >> 1).zfill(6)
key = ""
for i in range(0, len(str(n)), 1):
key += ZS_API_KEY[int(str(n)[i])]
for j in range(0, len(str(r)), 1):
key += ZS_API_KEY[int(str(r)[j])+2]
uri = f'{ZS_BASE_URI}/authenticatedSession'
body = {'apiKey': key,
'username': ZS_API_USERNAME,
'password': ZS_API_PASSWORD,
'timestamp': now}
try:
r = requests.post(uri, data=json.dumps(body), headers=headers, proxies=PROXY)
r.raise_for_status()
jsessionid = re.sub(r';.*$', "", r.headers['Set-Cookie'])
except Exception as e:
logging.error(f'Error: {str(e)}')
sys.exit(1)
return jsessionid
def zs_get_categories(headers):
# Find any existing categories matching ZS_CATEGORY_NAME
uri = f'{ZS_BASE_URI}/urlCategories/lite'
try:
r = requests.get(uri, headers=headers, proxies=PROXY)
r.raise_for_status()
except Exception as e:
logging.error(f'Error: {str(e)}')
sys.exit(1)
data = r.json()
for item in data:
if item.get('configuredName') == ZS_CATEGORY_NAME:
return item.get('id')
return None
def zs_update_categories(headers, domains, id = None):
# Update the ZS_CATEGORY_NAME with blocklist from Netskope
description = f'{ZS_CATEGORY_DESC}\n\nLast Updated: {str(time.ctime(int(time.time())))}'
body = {'configuredName': ZS_CATEGORY_NAME,
'customCategory': 'true',
'superCategory': 'SECURITY',
'urls': domains,
'description': description}
try:
if id == None:
uri = f'{ZS_BASE_URI}/urlCategories'
r = requests.post(uri, json=body, headers=headers, proxies=PROXY)
else:
uri = f'{ZS_BASE_URI}/urlCategories/{str(id)}'
r = requests.put(uri, json=body, headers=headers, proxies=PROXY)
r.raise_for_status()
except Exception as e:
logging.error(f'Error: {str(e)}')
sys.exit(1)
return None
def zs_logout(headers):
# Logout from ZScaler
uri = f'{ZS_BASE_URI}/authenticatedSession'
try:
r = requests.delete(uri, headers=headers, proxies=PROXY)
r.raise_for_status()
except Exception as e:
logging.error(f'Error: {str(e)}')
sys.exit(1)
return None
##############################################
request_headers = {'Content-Type': 'application/json', 'Cache-Control': 'no-cache', 'User-Agent': UA_STRING}
domains = ntskp_get_domains()
request_headers['Cookie'] = zs_auth(request_headers)
zs_update_categories(request_headers, domains, zs_get_categories(request_headers))
zs_logout(request_headers)
logging.info(f'Netskope added {str(len(domains))} domains added to ZScaler custom URL category {ZS_CATEGORY_NAME}')