In this post, I’m going to configure Google Cloud Function as an interface among Fortigate, Slack and Github. Once all deploy completed, all the configuration changes on Fortigate will be automatically notified to Slack, and it will be uploaded to Github for version control.
Continue reading “Fortigate Config version management”Fortigate Config Change Notification
Whenever changes are made in configuration, Fortigate posts notification at Slack channel.
Fortigate automation is composed of three elements:
- automation trigger … available trigger -HA Failover, Config change, Log, IOC, High CPU, Conserve mode
- automation action … available action -Email, IP Ban, AWS lambda, Webhook
- automation stitch … Combination of trigger and action
Python 100 project #52: Cost Notification for GCP
I used to use AWS quite often previously, and I created cost notification using python on AWS lambda and slack API. These few months though, I am not using AWS much, but GCP due to personal reasons. Hence I created (almost) same notification using Google Cloud Function and slack API.
Continue reading “Python 100 project #52: Cost Notification for GCP”
Python 100 project #50: Get Audit Report on Slack
In this project, I extended the previous project “PDF to TXT”, and now it’s posted to Slack every day.
So in short, every day the sophos XG firewall sends the security audit report(PDF) to the python powered server, and the server interpret the PDF into the text, (and of course it selects the necessary part only) and post the daily summary on slack.
Output:
Code:
import base64 from io import BytesIO from pprint import pprint import tempfile import aiosmtpd.controller import asyncio import email import audit_reader import slack class CustomSMTPHandler: async def handle_DATA(self, server, session, envelope): msg = email.message_from_string(str(envelope.content,'utf-8')) for part in msg.walk(): if part.get_content_type().startswith("application/pdf"): pdf_bytes = BytesIO(part.get_payload(decode=True)) data = audit_reader.retrieve_data(pdf_bytes) slack.post(data, 'security_logs', envelope.mail_from) print('from:', envelope.mail_from) return '250 OK' async def main(loop): handler = CustomSMTPHandler() server = aiosmtpd.controller.Controller(handler,hostname='XX.XX.XX.XX', port=XXXX) server.start() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(main(loop=loop)) try: print("server running...") loop.run_forever() except KeyboardInterrupt: pass
Python 100 project #42: Slack Bot – AWS EC2 list
Following up the previous project, I created Slack bot to get EC2 instance list (of all regions) in one shot.
So now no need to open the terminal to invoke the command every time. Just need to ask Slack “/100p ec2 list” and the result is posted.
I used AWS API Gateway to receive the slash command from Slack. So it is easy to add functions.
Output Example:
Here is the code:
This is the receiver code which is invoked when Slack slach command post request to API Gateway.
from base64 import b64decode import json import os from urllib.parse import parse_qs import logging import boto3 ENCRYPTED_EXPECTED_TOKEN = "kms_base64encodedkey=" kms = boto3.client('kms') expected_token = str(kms.decrypt(CiphertextBlob = b64decode(ENCRYPTED_EXPECTED_TOKEN))['Plaintext'], 'utf-8') logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): req_body = event['body'] params = parse_qs(req_body) print("received data...", params) token = params['token'][0] if token != expected_token: logger.error("Request token (%s) does not match exptected", token) raise Exception("Invalid request token") user = params['user_name'][0] command = params['command'][0] channel = params['channel_name'][0] if 'text' in params.keys(): command_text = params['text'][0] else: command_text = '' response_url = params['response_url'][0] arg = command_text.split(' ') sns = boto3.client('sns') SNS_CHANNEL = os.environ['SNS_CHANNEL'] topic_arn = sns.create_topic(Name=SNS_CHANNEL)['TopicArn'] message={"user_name": user, "command": command, "channel": channel, "command_text": command_text, "response_url": response_url} message=json.dumps(message) message=json.dumps({'default': message, 'lambda': message}) response = sns.publish( TopicArn=topic_arn, Subject='/100p', MessageStructure='json', Message=message ) return { "text": "received command - %s . Please wait for a few seconds for the reply to be posted." % (command_text) }
And this is the actual code to post the result to the Slack.
import json import sys import boto3 import requests def get_regions(service): credential = boto3.session.Session() return credential.get_available_regions(service) def list_ec2_servers(region): credential = boto3.session.Session() ec2 = credential.client('ec2', region_name=region) instances = ec2.describe_instances() servers_list = [] for reservations in instances['Reservations']: for instance in reservations['Instances']: tags = parse_keyvalue_sets(instance['Tags']) state = instance['State']['Name'] servers_list.append([region, instance['InstanceId'], tags['Name'], state]) return servers_list def parse_keyvalue_sets(tags): result = {} for tag in tags: key = tag['Key'] val = tag['Value'] result[key] = val return result def lambda_handler(event, context): message = event['Records'][0]['Sns']['Message'] try: message = json.loads(message) user_name = message['user_name'] command = message['command'] command_text = message['command_text'] response_url = message['response_url'] arg = command_text.split(' ') if arg[0] == 'ec2': resp = ec2_helper(arg[1:]) # TODO else: statement for other functions # if response_type is not specified, act as the same as ephemeral # ephemeral, response message will be visible only to the user slack_message = { 'channel': '@%s' % user_name, # 'response_type': 'in_channel', 'response_type': 'ephemeral', 'isDelayedResponse': 'true', 'text': resp } print("Send message to %s %s" % (response_url, slack_message)) header = {'Content-Type': 'application/json'} response = requests.post(response_url, headers=header, data=json.dumps(slack_message)) if response.status_code == 200: print("Message posted to %s" % slack_message['channel']) except requests.exceptions.RequestException as e: print(e) except: e = sys.exc_info()[0] print("Something wrong happened...", e) def ec2_helper(command): regions = get_regions('ec2') if command[0] == 'list': region_servers = [] for region in regions: servers = list_ec2_servers(region) region_servers.extend(servers) msg = "" for server in region_servers: msg += '\t'.join(server) msg += "\n" # TODO else for other functions return msg
Python 100 project #42: AWS Data Post to Slack – Billing
Sometimes, I forgot to stop the AWS instance, and I’m lazy to check the running instance for a while. Then it suddenly comes clear when I receives the email from AWS for the billing of the previous month.
To avoid this surprise, I created Lambda function to post the estimated cost of the period every morning to the slack.
Output Example:
Here is the (main handler) code:
import datetime import logging import os import boto3 import requests import slack SLACK_CHANNEL = os.environ['SLACK_CHANNEL'] logger = logging.getLogger() logger.setLevel(logging.INFO) def estimated_cost(): response = boto3.client('cloudwatch', region_name='us-east-1') get_metric_statistics = response.get_metric_statistics( Namespace='AWS/Billing', MetricName='EstimatedCharges', Dimensions=[ { 'Name': 'Currency', 'Value': 'USD' } ], StartTime=datetime.datetime.today() - datetime.timedelta(days=1), EndTime=datetime.datetime.today(), Period=86400, Statistics=['Maximum'] ) return get_metric_statistics['Datapoints'][0]['Maximum'] def lambda_handler(event, context): date = get_metric_statistics['Datapoints'][0]['Timestamp'].strftime('%Y-%m-%d') cost = estimated_cost() content = "Estimated cost is %s as of %s" % (cost, date) try: slack.post(content, SLACK_CHANNEL, context.function_name) logger.info("Message posted to %s, %s" % (SLACK_CHANNEL, content)) except requests.exceptions.RequestException as e: logger.error("Request failed: %s", e)
Python 100 project #41: Syslog Post to Slack – Content Filtering
Following up the last project, I created another function so that my syslog server can post slack upon rejection of client request due to the content filtering.
Output Example:
Here is the new modified syslog_server.py with new function and some rearrangement of the function:
## Reference https://gist.github.com/marcelom/4218010 ## Tiny Syslog Server in Python. ## ## This is a tiny syslog server that is able to receive UDP based syslog ## entries on a specified port and save them to a file. ## That's it... it does nothing else... ## There are a few configuration parameters. HOST, PORT = "0.0.0.0", 514 PRINT_LOG = True # SYSLOG Notification parameter CONTENT_FILTERING_NOTIFY = True # # NO USER SERVICEABLE PARTS BELOW HERE... # import logging import re import socketserver import sys import custom_helper.slack class SyslogUDPHandler(socketserver.BaseRequestHandler): def handle(self): data = bytes.decode(self.request[0].strip(), encoding="utf-8") socket = self.request[1] if PRINT_LOG: print("%s : " % self.client_address[0], str(data.encode("utf-8"))) if CONTENT_FILTERING_NOTIFY: cf_notify(data) logging.info(str(data.encode("utf-8"))) def cf_notify(log): log_match = re.search(r'log_type="(.*?)".*log_subtype="(.*?)".*category="(.*?)".*url="(.*?)"', log) if log_match[1] == "Content Filtering" and log_match[2] == "Denied": category, url = log_match[3], log_match[4] custom_helper.slack.post(f"Content Filtering Denied: {category} - {url}", "security_logs", "HOME_SOPHOS") if __name__ == "__main__": if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} log_file_name") sys.exit(0) try: LOG_FILE = sys.argv[1] logging.basicConfig(level=logging.INFO, format='%(message)s', datefmt='', filename=LOG_FILE, filemode='a') server = socketserver.UDPServer((HOST,PORT), SyslogUDPHandler) server.serve_forever(poll_interval=0.5) except (IOError, SystemExit): raise except KeyboardInterrupt: print ("Crtl+C Pressed. Shutting down.")
Python 100 project #31: Cisco IOS Login notification to Slack with EEM
It is best practice and most of the router console is only accessible from certain ip address. Usually it is both from the service provider and internal ip address. But time to time, some of internal staff tries to access the router. It’s still just a tedious problem if there is centralised management system(or any syslog actually) to check those activities. But it will be a problem if the deployment is distributed and not centralised. In this project, I used cisco EEM along with python script so that it posts into the channel if there is any login activities.
Output Example:
Here is the code:
this is relevant eem config in cisco router-
us-east-1_e-rtr-01#sh run | sec event manager event manager applet login_success_post event syslog pattern "SEC_LOGIN-5-LOGIN_SUCCESS" action 0.0 cli command "en" action 1.0 regexp "([0-9]+\.[0-9]+\.[0-9]+\.[0-9])" "$_syslog_msg" match source_ip action 2.0 cli command "guestshell run python /bootflash/gs_scripts/login_post.py $_info_routername success $source_ip"
this is relevant script which is saved as /bootflash/gs_scripts/login_post.py in the router-
import sys import requests # from cred import token token = *** def post_slack(msg, channel, hostname): headers = { "Content-type": "application/json" } params = { "token": token, "text": msg, "channel": channel, "as_user": False, "username": hostname, "icon_emoji": ":cisco-rtr:" } url = "https://slack.com/api/chat.postMessage" resp = requests.post(url, params=params, headers=headers) return resp.status_code if __name__ == "__main__": if len(sys.argv) != 4: print "usage: %s 'hostname' 'fail|success' 'source_ip_address'" sys.exit(0) post_slack('Login %s from %s' % (sys.argv[2], sys.argv[3]), "security_logs", sys.argv[1])