In my previous post, I described how to set up a netbox instance with ansible. In this post, I walk through using ansible scripts to populate your netbox.

Step 1: Prepare your scripts

You’re going to need three scripts and a couple of requirements file to be able to populate your netbox semi-automagically.

The scripts assume that you will provide your netbox URL and token via environment variable. You can create an env file taht would look like this:

NETBOX_API=http://netbox.yournetwork.local:8000
NETBOX_API_KEY=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

The final result once you’re through with this step should look something like this:

.
├── .env
├── filter_plugins
│   └── netbox_listen_diff.py
├── netbox_collect.yml
├── netbox_inventory.yml
├── requirements.txt
└── requirements.yml

Here’s a description of each along with the code:

Requirements files

You’ll need a requirements.txt file for your dependencies, to be executed with pip install -r requirements.txt. Here it is:

ansible==11.9.0
ansible-compat==25.8.1
ansible-core==2.18.8
dnspython==2.8.0
pynetbox==7.5.0

This following file simply contains the dependency for netbox. To use it, you’ll run ansible-galaxy install -r requirements.yml.

collections:
  - name: netbox.netbox

Inventory script

This makes use of the netbox nb_inventory to create an inventory from existing assets in your netbox. Save it as netbox_inventory.yml

plugin: netbox.netbox.nb_inventory
config_context: false
group_by:
  - device_roles

Filter plugin

This is a plugin script written in python that will allow you to parse and diff services. Save it in a subdirectory called filter_plugins with a name like netbox_listen_diff.py:

# -*- coding: utf-8 -*-
from __future__ import annotations
import ipaddress
from typing import Any, Dict, Iterable, List, Tuple, Set
from ansible.utils.display import Display
display = Display()

def _ipstr(addr: str) -> str:
    if not addr:
        return addr
    if "/" in addr:
        addr = addr.split("/", 1)[0]
    return str(ipaddress.ip_address(addr))

def _is_wildcard(addr: str) -> bool:
    if addr == "*":
        return True
    try:
        ip = ipaddress.ip_address(addr)
        return ip.is_unspecified  # 0.0.0.0 or ::
    except ValueError:
        return addr in ("0.0.0.0", "::")

def normalize_tcp_listen(tcp_listen: Iterable[Dict[str, Any]], primary_ip: str) -> List[Dict[str, Any]]:
    """
    Convert Ansible tcp_listen facts to a JSON/YAML-safe list of dicts:
      [{'ip': <primary_ip>, 'protocol': 'tcp', 'port': <int>}, ...]
    Wildcard binds (0.0.0.0/::) are mapped to the host's primary_ip.
    Listens bound to other local IPs are ignored (by design).
    """
    out: List[Dict[str, Any]] = []
    p_ip = _ipstr(primary_ip)
    for entry in tcp_listen or []:
        name = entry.get("name")
        port = entry.get("port")
        addr = entry.get("address") or entry.get("ip")
        if "%" in addr:
            addr = addr.split( "%")[0]
        if addr.startswith("127.0.0") or addr == "::":
            continue
        if port is None or addr is None:
            continue
        try:
            port = int(port)
        except Exception:
           continue
        if _is_wildcard(addr) or _ipstr(addr) == p_ip:
            out.append({"ip": p_ip, "protocol": "tcp", "port": port, "name": name})
    return out

def normalize_netbox_services(services: Iterable[Dict[str, Any]], primary_ip: str) -> List[Dict[str, Any]]:
    """
    Convert NetBox services to a list of dicts like above, filtered to this host's primary_ip and TCP only.
    NetBox 4.3.x service schema: protocol, ports[], ipaddresses[] (each has 'address' like 'A.B.C.D/nn' or '.../display')
    """
    out: List[Dict[str, Any]] = []
    p_ip = _ipstr(primary_ip)

    for svc in services or []:
        service_description = svc.get("value")
        proto = str(service_description.get("protocol").get("value")).lower()
        if proto != "tcp":
            continue
        ports = service_description.get("ports") or []
        ipas = service_description.get("ipaddresses") or []
        name = service_description.get("name")

        for port in ports:
            try:
                port = int(port)
            except Exception:
                continue
            out.append({"ip": p_ip, "protocol": "tcp", "port": port, "name": name})
    return out

def _to_tuple_set(items: Iterable[Any]) -> Set[Tuple[str, str, int, str]]:
    """
    Robustly convert a list of dicts/tuples to a set of (ip, protocol, port).
    Ignores strings (e.g., AnsibleUnsafeText) gracefully.
    """
    s: Set[Tuple[str, str, int, str]] = set()
    for x in items or []:
        try:
            if isinstance(x, dict):
                ip = _ipstr(x.get("ip") or x.get("address") or "")
                proto = str(x.get("protocol", "")).lower()
                port = int(x.get("port"))
                name = str(x.get("name"))
                if ip and proto and port is not None:
                    s.add((ip, proto, port, name))
            elif isinstance(x, (list, tuple)) and len(x) == 4:
                ip, proto, port, name = x
                ip = _ipstr(str(ip))
                proto = str(proto).lower()
                port = int(port)
                s.add((ip, proto, port, name))
            else:
                # ignore strings/unknowns
                pass
        except Exception:
            continue
    return s

def _quads_to_dicts(quads: Iterable[Tuple[str, str, int, str]]) -> List[Dict[str, Any]]:
    return [{"ip": ip, "protocol": proto, "port": int(port), "name": name} for (ip, proto, port, name) in quads]

def diff_listens(netbox_items: Iterable[Any], host_items: Iterable[Any]) -> Dict[str, Any]:
    """
    Accepts any JSON/YAML-safe sequences (lists of dicts/tuples).
    Converts to sets internally to compute:
      matched, missing_on_host, unexpected_on_host
    Returns lists of dicts (JSON/YAML friendly).
    """
    net_set = _to_tuple_set(netbox_items)
    host_set = _to_tuple_set(host_items)

    matched = sorted(list(net_set & host_set))
    missing_on_host = sorted(list(net_set - host_set))
    unexpected_on_host = sorted(list(host_set - net_set))

    display.display(str(matched))
    display.display(str(missing_on_host))
    display.display(str(unexpected_on_host))

    return {
        "matched": _quads_to_dicts(matched),
        "missing_on_host": _quads_to_dicts(missing_on_host),
        "unexpected_on_host": _quads_to_dicts(unexpected_on_host),
    }

class FilterModule(object):
    def filters(self):
        return {
            "normalize_tcp_listen": normalize_tcp_listen,
            "normalize_netbox_services": normalize_netbox_services,
            "diff_listens": diff_listens,
        }

Collection playbook

You now need a collection playbook. This connects to each host in the inventory, catalogs the ports, and then uses the filter plugin to diff and selectively add them to netbox. Let’s call this netbox_collect.yml.

---
- name: Collect the inventory of all my assets
  hosts: all
  vars:
    netbox_url: "{{ lookup('ansible.builtin.env', 'NETBOX_API') }}"
    netbox_token: "{{ lookup('ansible.builtin.env', 'NETBOX_API_KEY') }}"

  pre_tasks:
    - name: Ensure network facts (for tcp_listen)
      ansible.builtin.setup:
        gather_subset:
          - network
      when: ansible_facts.tcp_listen is not defined

    - name: Determine primary IP (prefer IPv4 then IPv6)
      ansible.builtin.set_fact:
        primary_ip: >-
          {{ (ansible_facts.get('default_ipv4', {}).get('address'))
             | default(ansible_facts.get('default_ipv6', {}).get('address'), true) }}

    - name: Sanity check primary IP
      ansible.builtin.assert:
        that:
          - primary_ip is string
          - primary_ip | length > 0
        fail_msg: "Could not determine primary IP (default_ipv4/default_ipv6)."

  tasks:
    - name: Gather info about listening ports
      become: true
      become_user: root
      community.general.listen_ports_facts:
        command: 'ss'

    - name: Fetch NetBox services filtered by exact IP (server-side) via nb_lookup
      vars:
        _nb_args:
          api_endpoint: "{{ netbox_url }}"
          token: "{{ netbox_token }}"
          validate_certs: true
          # NetBox 4.3.6 supports filtering services by ipaddress
          api_filter: "ipaddress={{ primary_ip }}&limit=0"
      ansible.builtin.set_fact:
        nb_services_raw: >-
          {{ lookup('netbox.netbox.nb_lookup', 'services',
                    api_endpoint=_nb_args.api_endpoint,
                    token=_nb_args.token,
                    validate_certs=_nb_args.validate_certs,
                    api_filter=_nb_args.api_filter) }}

    - name: (Resilience) If the server-side filter errored, fall back to all services (client-side filter)
      block:
        - name: No-op (nb_services_raw already set)
          ansible.builtin.debug:
            msg: "Server-side filter result length: {{ nb_services_raw | length }}"
          changed_when: false
      rescue:
        - name: Fallback – fetch all services via nb_lookup
          vars:
            _nb_args_all:
              api_endpoint: "{{ netbox_url }}"
              token: "{{ netbox_token }}"
              validate_certs: true
          ansible.builtin.set_fact:
            nb_services_raw: >-
              {{ lookup('netbox.netbox.nb_lookup', 'services',
                        api_endpoint=_nb_args_all.api_endpoint,
                        token=_nb_args_all.token,
                        validate_certs=_nb_args_all.validate_certs) }}

    - name: Normalize NetBox services to {(ip, 'tcp', port)} for this host
      ansible.builtin.set_fact:
        nb_tcp_set: "{{ nb_services_raw | normalize_netbox_services(primary_ip) }}"

    - name: Build non-loopback listeners list
      ansible.builtin.set_fact:
        non_loopback_listeners: >-
          {{
            (ansible_facts.tcp_listen + ansible_facts.udp_listen)
            | selectattr('address', 'defined')
            | rejectattr('address', 'match', '^(127.)')
            | rejectattr('address', 'match', '^(::1)')
            | rejectattr('port', 'match', '^(68)')
            | list
          }}

    - name: Normalize local tcp_listen to {(ip, 'tcp', port)} (wildcards match primary IP)
      ansible.builtin.set_fact:
        host_tcp_set: "{{ ansible_facts.tcp_listen | normalize_tcp_listen(primary_ip) }}"

    - name: Compute diff
      ansible.builtin.set_fact:
        tcp_diff: "{{ nb_tcp_set | diff_listens(host_tcp_set) }}"

    - name: Show nb_services
      ansible.builtin.debug:
        var: nb_services_raw

    - name: Show tcp_services
      ansible.builtin.debug:
        var: host_tcp_set 

    - name: Show diff (structured)
      ansible.builtin.debug:
        var: tcp_diff

    - name: Create the device that corresponds to my asset
      delegate_to: localhost
      netbox.netbox.netbox_device:
        netbox_url:  "{{ netbox_url }}"
        netbox_token: "{{ netbox_token }}"
        data:
          name: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"
          device_type: vm
          device_role: vm
          site: Home
      register: device_lookup

    - name: Create the interface that corresponds to my asset
      delegate_to: localhost
      netbox.netbox.netbox_device_interface:
        netbox_url:  "{{ netbox_url }}"
        netbox_token: "{{ netbox_token }}"
        data:
          name : "{{ ansible_facts['default_ipv4']['interface'] }}"
          type: 1000Base-T
          device: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"

    - name: Create or update the IP address of my asset
      delegate_to: localhost
      inf0junki3.inventory.netbox_ip_assign:
        netbox_url:  "{{ netbox_url }}"
        token: "{{ netbox_token }}"
        device: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"
        interface: "{{ ansible_facts['default_ipv4']['interface'] }}"
        address: "{{ ansible_facts['default_ipv4']['address'] }}/24"
        dns_name: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"
        status: active

    - name: Catalog ports
      delegate_to: localhost
      ansible.builtin.uri:
        url: "{{ netbox_url }}/api/ipam/services/"
        method: POST
        headers:
          Authorization: "Token {{ netbox_token }}"
          Content-Type: "application/json"
        body_format: json
        body:
          name: "{{ item['name'] }}"
          parent_object_type: "dcim.device"
          parent_object_id: "{{ device_lookup.device.id }}"
          ports: "[{{ item['port'] }}]"
          protocol: "{{ item['protocol'] }}"

        status_code:
          - 200
          - 201
      loop: "{{ tcp_diff['unexpected_on_host'] | unique }}"

Step 2: Populating netbox

Now let’s talk about how to populate the netbox. The premise is to use an authenticated connection to connect to assets on the network and extract the services running on them.

Let’s make some assumptions here:

  • I assume that in the very least, I’m able to identify my (legitimate) assets, by looking at my DHCP server’s leases and running a host sweep on my infrastructure. You don’t necessarily want to be connecting up to assets you don’t control, especially with a user name and password - a great way to steal credentials is to pose as an SSH or SMB server and wait for a machine to authenticate to you. It’s best to use a machine that has minimal attack surface, uses a private key to authenticate, and have Agent forwardring disabled.

  • I assume that you’re going to run this baseline at least once manually. While it is tedious and boring to collect the services on each machine of an infrastructure, it’s important to actually look through the results and prioritize important assets.

With this out of the way:

  1. Run a host sweep of your network, e.g. with nmap -sn -oG - 192.168.1.0/24. If you’re supremely confident about not having nasty hosts listening on your network, you can convert this to a basic inventory with:
    nmap -sT -p 22 192.168.1.0/24 -oG - | grep '22/open/tcp//ssh' | awk '{print $2}' > tmp_inventory
    
  2. Run your python3 -m venv venv; source venv/bin/activate; pip install -r requirements.txt commands to get the necessary libraries.
  3. Run ansible-galaxy install -r requirements.yml to install the netbox collection.
  4. Source your .env file so that you have the netbox variables in your environment.
  5. Run the playbook in check mode: ansible-playbook netbox_collect.yml --check -i tmp_inventory
  6. If you’re happy with the result, run the playbook in “for-keeps” mode: ansible-playbook netbox_collect.yml --check -i tmp_inventory

Conclusion

Automating your baselining with netbox will help you have better visibility on what is running on your network. Being able to look up a running service in a vetted inventory of pre-approved services will help you write better firewall rules and ferret out services that shouldn’t be running on your infrastructure.

I’ve provided a sample repository here, which I hope will be of use, please feel free to comment or contribute: https://github.com/Inf0Junki3/automatic-baselining-example

Please note that I’ve written a fair amount of this code with the assistance of an AI tool. If for some reason this code includes code that was previously written and licensed differently than the MIT license I slapped on there, I would very much appreciate it if you let me know by popping an issue on my repo - apologies for any incidental re-use!