netskope/Netskope_APIEvents-07.py

96 lines
3.2 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright 2019, Mischa Peters <mischa AT netskope DOT com>, Netskope.
# Version 1.0 - 20191107
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# Requires:
# - Python 3.x
#
import json
import urllib.request
import argparse
import sys
import urllib.parse
import re
import requests
parser = argparse.ArgumentParser(description="Collect all page events from Netskope API and process domains by category and confidence")
parser.add_argument("tenant", type=str, help="Tenant Name (eg. ams.eu)")
parser.add_argument("token", type=str, help="Tenant API Token")
parser.add_argument("-t", "--timeperiod", type=int, default='86400', help="Timeperiod 3600 | 86400 | 604800 | 2592000 (default: 86400)")
parser.add_argument("-r", "--records", type=int, default=100, help="# of records (default: 100)")
parser.add_argument("-v", "--verbose", action='store_true', help="verbose")
parser.add_argument("-d", "--debug", action='store_true', help="print raw json data")
try:
args = parser.parse_args()
tenant = args.tenant
token = args.token
timeperiod = args.timeperiod
records = args.records
verbose = args.verbose
debug = args.debug
except argparse.ArgumentError as e:
print(str(e))
cct_list = ["Cloud Storage", "Webmail"]
ccl_list = ["low", "poor"]
whitelist = re.compile("yahoo")
ioc_list = []
if verbose:
print("Using Categories: ", end='', flush=True)
print(", ".join(map(str,cct_list)))
print("Using Rating: ", end='', flush=True)
print(", ".join(map(str,ccl_list)))
print(f"Applying Whitelist: {whitelist.pattern}")
print()
print(f"{'#':>4} {'Domain':<50s} Confidence")
print("#######################################################################")
def get_json(type):
domain = "goskope.com"
url = f"https://{tenant}.{domain}/api/v1/events?token={token}&type={type}&timeperiod={timeperiod}"
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as response:
content = response.read()
json_data = json.loads(content)
if debug: print (json_data)
return(json_data)
def parse_json(json_content):
i = 0
for index, data in enumerate(json_content['data']):
if not "domain" in data:
domain = urllib.parse.urlparse(data["url"]).netloc
else:
domain = data["domain"]
if whitelist.search(domain):
continue
if data["category"] in cct_list:
if data["ccl"] in ccl_list:
if domain not in ioc_list:
i += 1
if verbose: print(f"{i:>4}) {domain:<50s} {data['ccl']}")
ioc_list.append(domain)
return ioc_list
#domain_list = ", ".join(map(str,ioc_list[:records]))
#return domain_list
json = get_json("page")
print(parse_json(json))