Secure and Easy Remote Work with Sophos UTM

I’m going to walkthrough how to setup remote access vpn in sophos UTM. This post is intended for the minimum deployment and might not be as scalable, but baseline is as below:

  • Clientless – no need to install client software on PC
  • Secure – Multifactor authentication
  • Affordable – no need for extra service nor device

As the requirement of remote access increase, IT needs to setup environment quickly, and still in cost effectively.
Sophos UTM is one of the least expensive UTM in the market, which is ready for enterprise use.

In summary, the settings follow below:

  1. configure users
  2. configure OTP
  3. (optional)configure user portal
  4. configure HTML5 VPN

First, you need to create users. This username is used for remote users to login to the portal.

We use tOTP based token this time to use Multi Factor Authentication(MFA). You just need to enable it.

We need to create HTML5 VPN Portal for every users in this case. First add “network definition” for users PC at office, I’m using IP address here, but alternatively you can use DNS name. Second add remote user, which you created at step 1, into “Allowed users” so that only the user can access each PC. And that’s all for Sophos UTM setup.

Ask users to access the URL “https://”, and they should login with the reomte user name and password which you created at step 1.

Once users logged into the portal, it should prompt users to register OTP. Users can use any tOTP based applicaiton. In my case I used Google Authenticator, which is available via playstore/applestore for free. Scan the QR code, and it should now prompt the PIN.

Once done, users need to login again. But this time the password is “password you created at step 1” + “PIN on tOTP app”(eg. secretpassword123456). users should be able to see the user portal now.

Click “HTML5 VPN Portal”, and then click the PC name to connect to.

It pops up another window showing your PC desktop. Ask users not to shutdown the PC, and ask them simply logoff or close the window.


Some UTMs require separate subscription to use clientless VPN (eg. PaloAlto), while Sophos UTM comes with most of the function built-in the box.

Please drop me a message if you encounter any problem. THanks for reading!

Python 100 project #50: Get Audit Report on Slack

In this project, I extended the previous project “PDF to TXT”, and now it’s posted to Slack every day.

So in short, every day the sophos XG firewall sends the security audit report(PDF) to the python powered server, and the server interpret the PDF into the text, (and of course it selects the necessary part only) and post the daily summary on slack.

 

Output:

 

 

 

Code:

import base64
from io import BytesIO
from pprint import pprint
import tempfile

import aiosmtpd.controller
import asyncio
import email

import audit_reader
import slack


class CustomSMTPHandler:
    async def handle_DATA(self, server, session, envelope):

        msg = email.message_from_string(str(envelope.content,'utf-8'))

        for part in msg.walk():
            if part.get_content_type().startswith("application/pdf"):
                pdf_bytes = BytesIO(part.get_payload(decode=True))
                data = audit_reader.retrieve_data(pdf_bytes)
                slack.post(data, 'security_logs', envelope.mail_from)
        print('from:', envelope.mail_from)
        return '250 OK'


async def main(loop):
    handler = CustomSMTPHandler()
    server = aiosmtpd.controller.Controller(handler,hostname='XX.XX.XX.XX', port=XXXX)
    server.start()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(main(loop=loop))
    try:
        print("server running...")
        loop.run_forever()
    except KeyboardInterrupt:
        pass

 

Python 100 project #49: PDF to Text Converter

There are several file format which looks user friendly but it is difficult to digest in data process. One of them is PDF. It has lots of contents inside, hence it’s usually very tough to get information out of it programatically.

This time, I used Sophos XG Firewall Daily Executive Report PDF to retrieve Hardware info.

 

Output:

$ python soreader.py 
{"CPU Usage": [{"idle_average": 93.98}, {"idle_min": 28.36}]}
{"Memory Usage": [{"used_min": 1.53}, {"used_average": 1.59}, {"used_max": 1.66}, {"total": "1.95"}]}

 

Code:

from itertools import izip_longest
import json
import re

import pdftotext

PARAMS = [
    "CPU Usage", "Memory Usage", "Disk Usage"
    ]


def retrieve_data(pdf):
    result_dict = {}
    # Iterate over all the pages
    for page in pdf:
        first_sentence = re.match(r"(.*?)\n", page)
        if first_sentence:
            param = first_sentence.group(1)
            if param in PARAMS:
                matched_obj = re.search(r"\n((.*?)\n){3,4}", page)
                matched_txt = matched_obj.group(0)
                data_list = [ cell.split() for cell in matched_txt.split("\n") ]
                result_dict[param] = data_list
    
    return result_dict


def cpu_usage_idle_parser(table):
    # this function just return idle parameter(MIN, AVERAGE) only.
    for row in table:
        if 'Idle' in row:
            raw_data_list = row[row.index('Idle')+1:]
            data_list = list(map(lambda x: float(x[:-1]), raw_data_list))
            # remove largest data, which should be the MAX value
            data_list.remove(max(data_list))
            idle_average = max(data_list)
            idle_min = min(data_list)
            
            result = {'CPU Usage': [
                {'idle_average': idle_average},
                {'idle_min': idle_min},
                ]}
                
            return json.dumps(result)
    return None


def memory_usage_parser(table):
    # this function returns memory utilization(used: MIN, AVERAGE, MAX).
    for row in table:
        if 'Used' in row:
            data_list = []
            for column in row:
                try:
                    formated_data = float(column)
                    data_list.append(formated_data)
                except ValueError:
                    pass
            used_min, used_avg, used_max = sorted(data_list)
        elif 'Total' in row:
            total_memory = row[row.index('Total')+1]
        
    result = {'Memory Usage': [
        {'used_min': used_min},
        {'used_average': used_avg},
        {'used_max': used_max},
        {'total': total_memory},
        ]}
            
    return json.dumps(result)


if __name__ == "__main__":
    # Load PDF file
    with open("testreport.pdf", "rb") as f:
        pdf = pdftotext.PDF(f)

    data = retrieve_data(pdf)
    
    print(cpu_usage_idle_parser(data['CPU Usage']))
    
    print(memory_usage_parser(data['Memory Usage']))

 

Python 100 project #41: Syslog Post to Slack – Content Filtering

Following up the last project, I created another function so that my syslog server can post slack upon rejection of client request due to the content filtering.

 

Output Example:

 

Here is the new modified syslog_server.py with new function and some rearrangement of the function:

## Reference https://gist.github.com/marcelom/4218010

## Tiny Syslog Server in Python.
##
## This is a tiny syslog server that is able to receive UDP based syslog
## entries on a specified port and save them to a file.
## That's it... it does nothing else...
## There are a few configuration parameters.

HOST, PORT = "0.0.0.0", 514
PRINT_LOG = True

# SYSLOG Notification parameter
CONTENT_FILTERING_NOTIFY = True

#
# NO USER SERVICEABLE PARTS BELOW HERE...
#

import logging
import re
import socketserver
import sys

import custom_helper.slack


class SyslogUDPHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = bytes.decode(self.request[0].strip(), encoding="utf-8")
        socket = self.request[1]
        if PRINT_LOG:
            print("%s : " % self.client_address[0], str(data.encode("utf-8")))
        if CONTENT_FILTERING_NOTIFY:
            cf_notify(data)
        logging.info(str(data.encode("utf-8")))


def cf_notify(log):
    log_match = re.search(r'log_type="(.*?)".*log_subtype="(.*?)".*category="(.*?)".*url="(.*?)"', log)
    if log_match[1] == "Content Filtering" and log_match[2] == "Denied":
        category, url = log_match[3], log_match[4]
        custom_helper.slack.post(f"Content Filtering Denied: {category} - {url}", "security_logs", "HOME_SOPHOS")


if __name__ == "__main__":

    if len(sys.argv) != 2:
        print(f"Usage: {sys.argv[0]} log_file_name")
        sys.exit(0)

    try:
        LOG_FILE = sys.argv[1]
        logging.basicConfig(level=logging.INFO, format='%(message)s', datefmt='', filename=LOG_FILE, filemode='a')
        server = socketserver.UDPServer((HOST,PORT), SyslogUDPHandler)
        server.serve_forever(poll_interval=0.5)
    except (IOError, SystemExit):
        raise
    except KeyboardInterrupt:
        print ("Crtl+C Pressed. Shutting down.")

 

Python 100 project #34: Sophos XG API

I’m using Sophos Firewall virtual appliance at home. The reason is quite simple, it’s free of charge. There are numbers of opensource firewall, but most of them lacks essential features(eg. AntiVirus, SSL inspection). Sophos is providing XG firewall VM for home use, and you can claim home license, which is technically has the same functionality of the business use, just less performance. In this project, I use python to access sophos XG firewall so that it can be monitored/configured to secure home network.

 

Output Example:

$ python sophos_base.py 
OrderedDict([('Response',
              OrderedDict([('@APIVersion', '1700.1'),
                           ('Login',
                            OrderedDict([('status',
                                          'Authentication Successful')])),
                           ('SecurityPolicy',
                            [OrderedDict([('@transactionid', ''),
                                          ('Name',
                                           'Auto added firewall policy for '
                                           'MTA'),
                                          ('Description',
                                           'This rule was added automatically '
                                           'by SFOS MTA. However you could '
                                           'edit this policy based on network '
                                           'requirement.'),
                                          ('IPFamily', 'IPv4'),
                                          ('Status', 'Enable'),
                                          ('Position', 'Top'),
                                          ('PolicyType', 'PublicNonHTTPPolicy'),
                                          ('MatchIdentity', 'Disable'),
                                          ('PublicNonHTTPBasedPolicy',
                                           OrderedDict([('ScanSMTP', 'Enable'),
                                                        ('ScanIMAP', 'Disable'),
                                                        ('ScanIMAPS',
                                                         'Disable'),
                                                        ('ScanPOP3', 'Disable'),
                                                        ('ScanSMTPS', 'Enable'),
                                                        ('ScanPOP3S',
                                                         'Disable')])),
                                          ('IntrusionPrevention', 'None'),
                                          ('TrafficShappingPolicy', 'None'),
                                          ('LogTraffic', 'Disable'),
                                          ('ApplyNAT', 'CustomNatPolicy'),
                                          ('OverrideGatewayDefaultNATPolicy',
                                           'Disable'),
                                          ('SourceSecurityHeartbeat',
                                           'Disable'),
                                          ('MinimumSourceHBPermitted',
                                           'No Restriction'),
                                          ('DestSecurityHeartbeat', 'Disable'),
                                          ('MinimumDestinationHBPermitted',
                                           'No Restriction'),
                                          ('OutboundAddress', 'MASQ')]),
                             OrderedDict([('@transactionid', ''),
                                          ('Name', '#Default_Network_Policy'),
                                          ('Description', None),
                                          ('IPFamily', 'IPv4'),
                                          ('Status', 'Enable'),
                                          ('Position', 'After'),
                                          ('PolicyType', 'Network'),
                                          ('After',
                                           OrderedDict([('Name',
                                                         'Auto added firewall '
                                                         'policy for MTA')])),
                                          ('SourceZones',
                                           OrderedDict([('Zone', 'LAN')])),
                                          ('DestinationZones',
                                           OrderedDict([('Zone', 'WAN')])),
                                          ('Schedule', 'All The Time'),
                                          ('Action', 'Accept'),
                                          ('LogTraffic', 'Disable'),
                                          ('MatchIdentity', 'Disable'),
                                          ('DSCPMarking', '-1'),
                                          ('ApplicationControl', 'None'),
                                          ('ApplicationBaseQoSPolicy',
                                           'Revoke'),
                                          ('WebFilter', 'Default Policy'),
                                          ('WebCategoryBaseQoSPolicy',
                                           'Revoke'),
                                          ('IntrusionPrevention',
                                           'lantowan_general'),
                                          ('TrafficShappingPolicy', 'None'),
                                          ('ApplyNAT', 'CustomNatPolicy'),
                                          ('OverrideGatewayDefaultNATPolicy',
                                           'Disable'),
                                          ('PrimaryGateway', None),
                                          ('OutboundAddress', 'MASQ'),
                                          ('BackupGateway', None),
                                          ('ScanHTTP', 'Enable'),
                                          ('ScanHTTPS', 'Disable'),
                                          ('Sandstorm', 'Disable'),
                                          ('ScanFTP', 'Disable'),
                                          ('SourceSecurityHeartbeat',
                                           'Disable'),
                                          ('MinimumSourceHBPermitted',
                                           'No Restriction'),
                                          ('DestSecurityHeartbeat', 'Disable'),
                                          ('MinimumDestinationHBPermitted',
                                           'No Restriction')])])]))])

 

Here is the code:

from dicttoxml import dicttoxml
import xmltodict
import requests

from data_source import sophos_credentials

def get_config(scope):

    param_dict = sophos_credentials.SEGFW_INT

    param_dict['GET'] = {
        scope: ""}

    xml = str(dicttoxml(param_dict, custom_root="Request", attr_type=False), 'utf-8')

    url = f"https://"firewall-ip":4444/webconsole/APIController?reqxml={xml}"

    resp = requests.get(url, verify=False)

    return resp.status_code, xmltodict.parse(resp.text)


from pprint import pprint

ret_code, ret_dict = get_config("SecurityPolicy")

if ret_code == 200:
    pprint(ret_dict)

 

API on Sophos: Sophos UTMを使ってみる (REST)

このエントリーはSophos UTMでのRESTの概要と、REST APIを使ってSophos UTMから情報を取得するところまで。

API概要


参考URL:Sophos UTM RESTful API

 

ラボ環境を作る


AWSでSophos UTMを立ち上げる。

    1. FWを立ち上げる
    2. sshでFWにアクセス
    3. RESTでアクセスできるように設定。

最初、キーが手元になかったので、WebGUIでSSH鍵認証を無効(パスワード認証)にしましたが、今回起動したUTM(version 9.506)ではパスワード認証は”Access Denied”となってしまいうまくいきませんでした。SophosUTMではSSHパスワード認証に難があるようで、フォーラムを見てもバージョンごとに修復、再発を繰り返しているようです。

$ssh -i aws_eu-ce1.pem loginuser@*******

Using username "loginuser".

Sophos UTM
(C) Copyright 2000-2017 Sophos Limited and others. All rights reserved.
Sophos is a registered trademark of Sophos Limited and Sophos Group.
All other product and company names mentioned are trademarks or registered
trademarks of their respective owners.

For more copyright information look at /doc/astaro-license.txt
or http://www.astaro.com/doc/astaro-license.txt

NOTE: If not explicitly approved by Sophos support, any modifications
      done by root will void your support.

loginuser@sophons9_eu-central-1:/home/login > su -
Password:
sophons9_eu-central-1:/root #
sophons9_eu-central-1:/root # cc set webadmin rest_api 1
1
sophons9_eu-central-1:/root #

 

APIを使ってみる


APIの構成は、”https://<FWのIPアドレス>:<ポート番号(デフォルトは4444)>/api/definitions”からたどることが出来る。下の例ではethernetのインタフェース情報は”/objects/interface/ethernet/”で取得できることがわかる

Sophos UTMのAPI認証は、ユーザー名とパスワードをリクエストと一緒に送る(BASIC認証)と、事前にTOKENを発行しておく二種類の方法がある。今回はBASIC認証を使う。

import requests
from pprint import pprint

def main():
    
    source = "https://******.eu-central-1.compute.amazonaws.com:4444/api"
    path = "/objects/interface/ethernet/"
    
    url = source + path
    
    headers = {"content-Type":"application/json", "Accept":"application/json", }
    
    r = requests.get(url, headers=headers, verify=False, auth=('api-user', '******'))
    
    return r.json()


if __name__ == '__main__':
    pprint(main())

これを実行するとこんな感じで情報が返ってくる

ec2-user:~/environment/sophos_api $ python3 get-interface.py 
[{'_locked': '',
  '_ref': 'REF_DefaultInternal',
  '_type': 'interface/ethernet',
  'additional_addresses': [],
  'bandwidth': 100000000,
  'comment': 'Auto-created on installation',
  'inbandwidth': 0,
  'itfhw': 'REF_ItfHwDefaultInternal',
  'link': True,
  'mtu': 9001,
  'mtu_auto_discovery': True,
  'name': 'Internal',
  'outbandwidth': 0,
  'primary_address': 'REF_ItfParamsDefaultInternal',
  'proxyarp': False,
  'proxyndp': False,
  'status': True}]
ec2-user:~/environment/sophos_api $