Dynamic Update for OCI's Ingress Firewall Rules

Using Oracle's OCI API to update firewall ingress rules dynamically, to allow for dynamic IP address lockdown.

Dynamic Update for OCI's Ingress Firewall Rules
Photo by King's Church International on Unsplash

Problem: I want to only open access to some services on my OCI Instance(s) to my home IP address (pihole for example). However, my home IP is dynamic, changing every week or so or whenever my router restarts.

Solution: Have my router do a HTTP GET on a publicly open website running on an instance, which allows the site to record my home public IP. A script runs periodically on an instance, and if the IP has changed, update OCI ingress firewall rules (using the OCI Python API) to allow that IP address.

Warnings:

  1. Backup your rules first. Since the API doesn't let us modify rules in-place, we need to replace them all with updated versions - so if something goes wrong, you could end up with an empty ingress rule-set, and be locked out.
  2. Have a backup way into your instances in case something like the above happens - see the Oracle Free Tier VPS - Best Practices post (e.g. use the magical Tailscale, and/or have a local console login ready and tested).
  3. Don't publicise the URL where your incoming PHP script lives (or the key) from step 2 below. This should really be more secure (https, a key exchange) to avoid risk of abuse / denial of service attacks - left as an exercise for the reader :-)
  4. Caveat Emptor. Should be obvious, but use anything here at your own risk. This works for me, and has done so flawlessly for some time, but the script was thrown together quickly, APIs change - YMMV.

Step 1 - Router Ping

I have a MikroTik router which makes this nice and easy. I initially wrote a RouterOS script to only ping my site when the router's assigned public IP address changed, but shortly after a reboot / IP change Internet access was unreliable so pings could go missing - so the script now pings the website every minute.

{
:local posterURL "http://aa.bb.cc.dd/poster.php?key=12345";

# GET to webserver watching for calls, so it gets current public IP
/tool fetch url=$posterURL keep-result=no;
}

Step 2 - Script to Record Public IP

Save the IP address of the client to a local file using a simple PHP script served by a webserver on an instance (Apache, Nginx, your choice). Simple double-check for a key so nobody else can potentially DoS us.

<?php
        $ipaddress = getenv("REMOTE_ADDR") ;
        echo "Client IP $ipaddress <br>";

        $queryString = $_SERVER['QUERY_STRING'];

        if (str_contains($queryString, 'key=12345')) {
                $myfile = fopen("/abc/xyz/currentip.txt", "w") or die("Unable to open file. Argh.");
                fwrite($myfile, $ipaddress);
                fclose($myfile);

                echo "Done.<br>";
        } else {
                echo "Undone.<br>";
        }
?>

Step 3 - Use OCI API to Update Firewall Rules

Import libraries we need - the oci one is key, you'll need to install it first, see the quickstart reference docs link below. Also setup logging - level is debug since we want lots of detail - and the output format I prefer:

import oci
import json
import os
from datetime import datetime
import logging
import os.path
from os import path

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


Check if we have an IP address saved from home router (by above PHP script). If one exists and it is different from the last address we used for an update, then proceed. Otherwise exit now.

# first check for IP posted from home router, if none then exit.
ipFilename = '/xxx/xxx/currentip.txt'

savedIP = '10.0.0.123'  # safe fallback JIC
try:
  with open(ipFilename) as file:
    savedIP = file.read().strip()
    logging.info(f'Got IP: {savedIP}')
except:
  logging.info('No IP file, exiting peacefully.')
  exit()

lastIPFilename = '/tmp/poster-lastip.txt'

# have a posted home IP, check if different from last seen/update
if path.exists(lastIPFilename):
  # contains last IP we updated the FW rules to, so if it's the same as the one we just got, we can ignore and no update needed.
  with open(lastIPFilename) as lastIPfile:
    lastIP = lastIPfile.read().strip()
    logging.info(f'Got last update IP: {lastIP}')
    if (lastIP == savedIP):
      # is same as last update, so no need to do another update
      os.remove(ipFilename)
      logging.info('Same IP as last update, exiting.')
      exit()
    else:
      logging.info(f'New IP {savedIP} differs from last update IP {lastIP}, carrying on.')
else:
  logging.info(f'No last update IP saved, so carrying on.')

Init the main oci object, reading config from the OCI config file usually saved locally in ~/.oci along with your private key (xxx.pem), and set values we'll need later:

config = oci.config.from_file()

# Initialize service client with default config file
core_client = oci.core.VirtualNetworkClient(config)

# consts.
network_compartment_name = "ManagedCompartmentForPaaS"
vcn_cidr_range = "10.0.0.0/16"
vcn_subnet_cidr_range = "10.0.0.0/24"

# hardcoded specific values - can get most programatically, but easier to hardcode for now.
vcn_id = "ocid1.vcn.oc1.XXXXXX-YYYYYYYYyjkpeva"
vcn_name = "vcn-XXXX-YYYY"
# hardcode security list name
sl_name = "Default Security List for vcn-XXXX-YYYY"
vcn_sl_id = "ocid1.securitylist.oc1.AAAAAA-BBBBBBBBBBmegoq"

Get existing ingress rules (and dump them to disk just in case before we modify them). The API doesn't allow us to modify existing individual rules, instead we need to replace them all with our modified versions:

# get the current security list data (ingress rules since we'll be updating those)
get_security_list_response = core_client.get_security_list(
    security_list_id=vcn_sl_id)
current_vcn_sl=json.loads(str(get_security_list_response.data.ingress_security_rules))

# write details before modifying... JIC
with open('vcn_ingress_list_before.json', "w") as f:
   json.dump(current_vcn_sl, f, ensure_ascii=False, indent=4)

Define the rules we wish to update, making any required changes - in this case, we only change the IP address. In this example, I'm updating 2 rules for DNS via TCP and UDP respectively. To see how existing rules are represented, inspect the populated "current_vcn_sl" variable above which will contain all your existing rules and their config parameters:

# updated rules that we want to replace/update existing ones
sl_update_rules=[
    {
      "description": "DNS TCP",
      "icmp_options": None,
      "is_stateless": False,
      "protocol": "6",
      "source": '' + savedIP + '/32',
      "source_type": "CIDR_BLOCK",
      "tcp_options": {
        "destination_port_range": {
          "max": 53,
          "min": 53
        },
        "source_port_range": None
      },
      "udp_options": None
    },

    {
      "description": "DNS UDP",
      "icmp_options": None,
      "is_stateless": False,
      "protocol": "17",
      "source": '' + savedIP + '/32',
      "source_type": "CIDR_BLOCK",
      "tcp_options": None,
      "udp_options": {
        "destination_port_range": {
          "max": 53,
          "min": 53
        },
        "source_port_range": None
      }
    }

    ]

Build a new list of every rule (since we need to replace them all, even if they are not changing). Build a list of the rules we don't want to change, and append our changed rules defined above:

# test conditional update - skip existing with the same description
# try adding current_vcn_sl items one by one, *unless* their same "description" exists in sl_update_rules, then skip as we'll be updating it.
updateRuleDescriptions = ""
for i in sl_update_rules:
  updateRuleDescriptions += "__" + i['description'] + "__"

logging.info(f'Rules to update: {updateRuleDescriptions}')

# for a rule to be replaced, it must already exist with same description as above
newUpdateRulesString = "[" # string for now
for i in current_vcn_sl:
  if not (i.get('description') is None) and (i['description'] in updateRuleDescriptions): # handle value "None" too
    # we're going to replace this one with an updated version, so skip it
    logging.info(f"Skipping existing rule as will update/replace: {i['description']}")
    pass
  else:
    newUpdateRulesString = newUpdateRulesString + json.dumps(i) + ","
    logging.info(f"Keeping existing rule unmodified: {i['description']}")

# strip trailing comma, and append closing bracket
newUpdateRulesString = newUpdateRulesString[:-1] + "]"

# we now have a list of all existing rules, besides those we want to update - so append the new ones
newUpdateRules = json.loads(newUpdateRulesString) + sl_update_rules

Define a function to help us update the entire security list in the next step:

##################################################################
# API to update the security list doesn’t expect a JSON object, 
#   so use this function to update the rules into something the API expects using IngressSecurityRule, 
#   this function would be called for every one of the rules
########################
def makeIngressRules(t):
    if t.get('tcp_options') is None:
       theseTcpOptions = None
    else:
        theseTcpOptions = oci.core.models.TcpOptions(
                    destination_port_range=oci.core.models.PortRange(
                        max=t['tcp_options']['destination_port_range']['max'],
                        min=t['tcp_options']['destination_port_range']['min']),
                    )

    if t.get('udp_options') is None:
       theseUdpOptions = None
    else:
        theseUdpOptions = oci.core.models.UdpOptions(
                    destination_port_range=oci.core.models.PortRange(
                        max=t['udp_options']['destination_port_range']['max'],
                        min=t['udp_options']['destination_port_range']['min']),
                    )

    if t.get('icmp_options') is None:
       theseIcmpOptions = None
    else:
        theseIcmpOptions = oci.core.models.IcmpOptions(
                        type=t['icmp_options']['type'],
                        code=t['icmp_options']['code']
                        )

    return(oci.core.models.IngressSecurityRule(
        source=t['source'],
        icmp_options=theseIcmpOptions,
        protocol=t['protocol'],
        source_type=t['source_type'],
        is_stateless=t['is_stateless'],
        tcp_options=theseTcpOptions,
        udp_options=theseUdpOptions,
        description=t['description'])
        )

Use the above function to update the security list with the complete list of all ingress rules, unchanged and changed:

updateReturn = core_client.update_security_list(
    security_list_id=vcn_sl_id,
    update_security_list_details=oci.core.models.UpdateSecurityListDetails(
        display_name=sl_name,
        ingress_security_rules=[makeIngressRules(t) for t in newUpdateRules]
))

Finally tidy up, delete the recorded IP file, save the new IP address for comparison on the next run.

if (updateReturn.status == 200):
  # delete IP file
  logging.info(f"Deleting IP file")
  try:
    os.remove(ipFilename)
  except:
    logging.info(f"Failed to remove IP file.")

  # log IP just updated, so next time we can check any new IPs against this, and only update FW if they differ
  logging.info(f'Updating last change IP file')
  try:
    with open(lastIPFilename, "w") as updateFile:
      updateFile.write(savedIP)
      logging.info("Updated last updated IP file.")
  except:
    logging.error("Failed to update last updated IP file")
    
else:
  logging.info(f"Final status not 200, not deleting IP file or updating last updated file.")

Step 4 - Schedule the Script to run periodically

A cron job is probably easiest, something like

# every minute, look for new IP address file and update FW rules
* * * * *       /home/ubuntu/XXX/YYY.py >> /home/ubuntu/XXX/YYY.log 2>&1

References