#!/usr/bin/env python3 # # Copyright 2022, Mischa Peters , Alkira. # push.py # Version 0.1 - 20220617 - initial release # Version 0.2 - 20220620 - added collection of credentialId # Version 0.3 - 20220708 - config everywhere!! # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # import os import sys import re import json import time import logging import requests import configparser import argparse # Parse all arguments parser = argparse.ArgumentParser(description="Push JSON files to AlkiraAPI configured in a cnf config file") parser.add_argument("-t", "--tenant", type=str, default='alkira.cnf', help="location of alikira.cnf (default: alkira.cnf)") parser.add_argument("-f", "--folder", type=str, default='config', help="location of the JSON connector files (default: config)") parser.add_argument("-c", "--config", type=str, default='connectors.cnf', help="location of the connector config (default: config/connectors.cnf)") parser.add_argument("-v", "--verbose", type=int, default=0, help="Verbose level 0 or 1 (default: 0)") try: args = parser.parse_args() ALKIRA_CONFIG = args.tenant connector_folder = args.folder CONNECTOR_CONFIG = f'{args.folder}/{args.config}' except argparse.ArgumentError as e: print(str(e)) sys.exit() try: loglevel = { 0: logging.INFO, 1: logging.DEBUG }[args.verbose] except KeyError: loglevel = logging.INFO ############################################### # Set loglevel (logging.INFO, logging.DEBUG) logging.basicConfig(level=loglevel) logging = logging.getLogger('AlkiraAPI') # Tenant config if not os.path.isfile(ALKIRA_CONFIG): logging.error(f"The config file {ALKIRA_CONFIG} doesn't exist") sys.exit(1) alkira = configparser.RawConfigParser() alkira.read(ALKIRA_CONFIG) ALKIRA_TENANT = alkira.get('alkira', 'ALKIRA_TENANT') ALKIRA_USERNAME = alkira.get('alkira', 'ALKIRA_USERNAME') ALKIRA_PASSWORD = alkira.get('alkira', 'ALKIRA_PASSWORD') ALKIRA_BASE_URI = f'https://{ALKIRA_TENANT}/api' AWS_SERVICE_USERNAME = alkira.get('services', 'AWS_SERVICE_USERNAME') AZURE_SERVICE_USERNAME = alkira.get('services', 'AZURE_SERVICE_USERNAME') SERVICE_PASSWORD = alkira.get('services', 'SERVICE_PASSWORD') CIDR_NAME = alkira.get('globalcidr', 'CIDR_NAME') CIDR_DESCR = alkira.get('globalcidr', 'CIDR_DESCR') CIDR_PREFIX = alkira.get('globalcidr', 'CIDR_PREFIX') CIDR_CXP = alkira.get('globalcidr', 'CIDR_CXP') # Connector config if not os.path.isfile(CONNECTOR_CONFIG): logging.error(f"The config file {CONNECTOR_CONFIG} doesn't exist") sys.exit(1) config = configparser.ConfigParser() config.read(CONNECTOR_CONFIG) ############################################### # Set default headers headers = {'Content-Type': "application/json"} # URL Exceptions url_exceptions = { "saas": "internet", "pan": "panfw", "ftntfw": "ftnt-fw-", "chkpfw": "chkp-fw-", "ocivcnconnectors": "oci-vcn-connectors", "ftntfwservices": "ftnt-fw-services", "chkpfwservices": "chkp-fw-services" } # URL Exceptions creating credentials service_credentials = { "panfwservices": "pan", "ftntfwservices": "ftntfw", "chkpfwservices": "chkp-fw" } # URL Exceptions creating instance credentials service_instance_credentials = { "ftntfwservices": "ftntfw-", "chkpfwservices": "chkp-fw-" } # Global CIDR service_global_cidr = [ "chkpfwservices" ] # Credential Types credential_types = { "awsvpc": "", "azurevnet": "", "gcpvpc": "", "ocivcn": "", } def alkira_login(): body = {'userName': ALKIRA_USERNAME, 'password': ALKIRA_PASSWORD} session = requests.session() response = alkira_post(session, '/login', body) return session def alkira_post(session, uri, body): url = f'{ALKIRA_BASE_URI}{uri}' try: response = session.post(url, data=json.dumps(body), headers=headers) match int(str(response.status_code)[0]): case 4: logging.error(response.content) response.raise_for_status() except Exception as e: logging.error(f'Error: {str(e)}') sys.exit(1) return response def alkira_get(session, uri): url = f'{ALKIRA_BASE_URI}{uri}' try: response = session.get(url, headers=headers) response.raise_for_status() except Exception as e: logging.error(f'Error: {str(e)}') sys.exit(1) return response def alkira_delete(session, uri): url = f'{ALKIRA_BASE_URI}{uri}' try: response = session.delete(url, headers=headers) response.raise_for_status() except Exception as e: logging.error(f'Error: {str(e)}') sys.exit(1) return response def alkira_service(session, connector_name): fwcredential = f'fwcredential-{time.time()}' body = { "credentials": { "userName": AWS_SERVICE_USERNAME, "password": SERVICE_PASSWORD }, "name": fwcredential } logging.debug(f'Received Connector: {connector_name}') logging.info('=== Create Service Credentials') if connector_name in service_credentials.keys(): credentials_url = service_credentials[connector_name] logging.debug(f'URL: {credentials_url}') response = alkira_post(session, f'/credentials/{credentials_url}', body) json_body = response.json() if response.status_code == 200: service_credentialid = json_body['id'] logging.debug(f'credentialId: {service_credentialid}') logging.info('=== Create Instance Credentials') if connector_name in service_instance_credentials.keys(): credentials_url = service_instance_credentials[connector_name] logging.debug(f'URL: {credentials_url}') response = alkira_post(session, f'/credentials/{credentials_url}instance', body) json_body = response.json() if response.status_code == 200: service_instance_credentialid = json_body['id'] logging.debug(f'instance credentialId: {service_instance_credentialid}') return service_credentialid, service_instance_credentialid def alkira_global_cidr(session, connector_name): body = { "name": CIDR_NAME, "description": CIDR_DESCR, "values": [ CIDR_PREFIX ], "cxp": CIDR_CXP } logging.debug(f'Received Connector: {connector_name}') logging.info('=== Create Global CIDR') response = alkira_post(session, f'/tenantnetworks/{tenantNetworkId}/global-cidr-lists', body) json_body = response.json() if response.status_code == 201: global_cidr_id = json_body['id'] logging.debug(f'global cidr id: {global_cidr_id}') return global_cidr_id # Authenticate logging.info('=== Authenticating') s = alkira_login() logging.debug(s) # Get TenantID logging.info('=== Fetching Tenant Info') r = alkira_get(s, '/tenantnetworks') data = r.json() tenantNetworkId = data[0]['id'] tenantName = data[0]['name'] logging.info(f'Tenant Name: {tenantName}') logging.info(f'Tenant ID: {tenantNetworkId}') # Get Credentials logging.info('=== Fetching Credentials') r = alkira_get(s, '/credentials') data = r.json() logging.debug(json.dumps(data)) for key in data: if key['credentialType'].lower() in credential_types: logging.debug(f"CredentialType: {key['credentialType']} - CredentialId: {key['credentialId']}") credential_types[key['credentialType'].lower()] = key['credentialId'] # Push connectors logging.info('=== Push Connectors') for connector in config.sections(): section = config[connector] connector_result = re.match(r'(\w+)(connectors|services)(\d+)', connector) connector_type = connector_result.group(1) connector_name = f'{connector_type}{connector_result.group(2)}' connector_number = connector_result.group(3) logging.debug(f'{connector_folder}/{connector_name}{connector_number}.txt') config_path = (f'{connector_folder}/{connector_name}{connector_number}.txt') if 'service' in connector_name: service_credentialid, service_instance_credentialid = alkira_service(s, connector_name) logging.debug(f'Got credentialId: {service_credentialid} AND {service_instance_credentialid}') if connector_name in service_global_cidr: service_global_cidr_id = alkira_global_cidr(s, connector_name) logging.debug(f'Got global cidr id: {service_global_cidr_id}') with open (config_path, 'r') as f: body = json.load(f) if 'connectors' in connector_name and connector_type in credential_types and credential_types[connector_type]: if 'credentialId' in body: logging.debug(f"JSON credentialid: {body['credentialId']}") logging.debug(f'API credentialid: {credential_types[connector_type]}') body['credentialId'] = credential_types[connector_type] for key in ['cxp', 'size', 'group']: if key in body: logging.debug(f"JSON {key}: {body[key]}") if key in section: try: if alkira.get(key, section[key]): logging.debug(f"CNF CONFIG {key}: {alkira.get(key, section[key])}") body[key] = alkira.get(key, section[key]) except: logging.debug(f"{key} not defined in main config") logging.debug(f"CONFIG {key}: {section[key]}") body[key] = section[key] for key in ['segments', 'billingTags']: if key in body: logging.debug(f"JSON {key}: {body[key][0]}") if key in section: try: if alkira.get(key, section[key]): logging.debug(f"CNF CONFIG {key}: {alkira.get(key, section[key])}") body[key][0] = alkira.get(key, section[key]) except: logging.debug(f"{key} not defined in main config") logging.debug(f"CONFIG key: {section[key]}") body[key][0] = section[key] if 'services' in connector_name and 'credentialId' in body and 'service_credentialid' in locals(): logging.debug(f'API credentialid: {service_credentialid}') body['credentialId'] = service_credentialid if 'services' in connector_name and 'instances' in body and 'credentialId' in body['instances'][0] and 'service_instance_credentialid' in locals(): logging.debug(f'API instance credentialid: {service_instance_credentialid}') body['instances'][0]['credentialId'] = service_instance_credentialid if 'services' in connector_name and 'managementServer' in body and 'globalCidrListId' in body['managementServer'] and 'service_global_cidr_id' in locals(): logging.debug(f'API globalCidrListId: {service_global_cidr_id}') body['managementServer']['globalCidrListId'] = service_global_cidr_id logging.debug(json.dumps(body)) logging.info(f"=== Pushing {body['name'][:30]} ({connector_name}) to {body['cxp']} (size: {body['size']}; segment: {body['segments'][0]})") logging.debug(f'CONNECTOR BEFORE AGAIN: {connector_name}') if connector_name in url_exceptions.keys(): connector_name = url_exceptions[connector_name] logging.debug(f'CONNECTOR AFTER AGAIN: {connector_name}') r = alkira_post(s, f'/tenantnetworks/{tenantNetworkId}/{connector_name}', body) logging.info(r.status_code) logging.debug(r.content)