In my previous post, I described how to set up a netbox instance with ansible. In this post, I walk through using ansible scripts to populate your netbox.
Step 1: Prepare your scripts
You’re going to need three scripts and a couple of requirements file to be able to populate your netbox
semi-automagically.
The scripts assume that you will provide your netbox URL and token via environment variable. You can create an env file taht would look like this:
NETBOX_API=http://netbox.yournetwork.local:8000
NETBOX_API_KEY=aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
The final result once you’re through with this step should look something like this:
.
├── .env
├── filter_plugins
│ └── netbox_listen_diff.py
├── netbox_collect.yml
├── netbox_inventory.yml
├── requirements.txt
└── requirements.yml
Here’s a description of each along with the code:
Requirements files
You’ll need a requirements.txt file for your dependencies, to be executed with pip install -r requirements.txt.
Here it is:
ansible==11.9.0
ansible-compat==25.8.1
ansible-core==2.18.8
dnspython==2.8.0
pynetbox==7.5.0
This following file simply contains the dependency for netbox. To use it, you’ll run ansible-galaxy install -r requirements.yml.
collections:
- name: netbox.netbox
Inventory script
This makes use of the netbox nb_inventory to create an inventory from existing assets in your netbox. Save it as
netbox_inventory.yml
plugin: netbox.netbox.nb_inventory
config_context: false
group_by:
- device_roles
Filter plugin
This is a plugin script written in python that will allow you to parse and diff services. Save it in a subdirectory
called filter_plugins with a name like netbox_listen_diff.py:
# -*- coding: utf-8 -*-
from __future__ import annotations
import ipaddress
from typing import Any, Dict, Iterable, List, Tuple, Set
from ansible.utils.display import Display
display = Display()
def _ipstr(addr: str) -> str:
if not addr:
return addr
if "/" in addr:
addr = addr.split("/", 1)[0]
return str(ipaddress.ip_address(addr))
def _is_wildcard(addr: str) -> bool:
if addr == "*":
return True
try:
ip = ipaddress.ip_address(addr)
return ip.is_unspecified # 0.0.0.0 or ::
except ValueError:
return addr in ("0.0.0.0", "::")
def normalize_tcp_listen(tcp_listen: Iterable[Dict[str, Any]], primary_ip: str) -> List[Dict[str, Any]]:
"""
Convert Ansible tcp_listen facts to a JSON/YAML-safe list of dicts:
[{'ip': <primary_ip>, 'protocol': 'tcp', 'port': <int>}, ...]
Wildcard binds (0.0.0.0/::) are mapped to the host's primary_ip.
Listens bound to other local IPs are ignored (by design).
"""
out: List[Dict[str, Any]] = []
p_ip = _ipstr(primary_ip)
for entry in tcp_listen or []:
name = entry.get("name")
port = entry.get("port")
addr = entry.get("address") or entry.get("ip")
if "%" in addr:
addr = addr.split( "%")[0]
if addr.startswith("127.0.0") or addr == "::":
continue
if port is None or addr is None:
continue
try:
port = int(port)
except Exception:
continue
if _is_wildcard(addr) or _ipstr(addr) == p_ip:
out.append({"ip": p_ip, "protocol": "tcp", "port": port, "name": name})
return out
def normalize_netbox_services(services: Iterable[Dict[str, Any]], primary_ip: str) -> List[Dict[str, Any]]:
"""
Convert NetBox services to a list of dicts like above, filtered to this host's primary_ip and TCP only.
NetBox 4.3.x service schema: protocol, ports[], ipaddresses[] (each has 'address' like 'A.B.C.D/nn' or '.../display')
"""
out: List[Dict[str, Any]] = []
p_ip = _ipstr(primary_ip)
for svc in services or []:
service_description = svc.get("value")
proto = str(service_description.get("protocol").get("value")).lower()
if proto != "tcp":
continue
ports = service_description.get("ports") or []
ipas = service_description.get("ipaddresses") or []
name = service_description.get("name")
for port in ports:
try:
port = int(port)
except Exception:
continue
out.append({"ip": p_ip, "protocol": "tcp", "port": port, "name": name})
return out
def _to_tuple_set(items: Iterable[Any]) -> Set[Tuple[str, str, int, str]]:
"""
Robustly convert a list of dicts/tuples to a set of (ip, protocol, port).
Ignores strings (e.g., AnsibleUnsafeText) gracefully.
"""
s: Set[Tuple[str, str, int, str]] = set()
for x in items or []:
try:
if isinstance(x, dict):
ip = _ipstr(x.get("ip") or x.get("address") or "")
proto = str(x.get("protocol", "")).lower()
port = int(x.get("port"))
name = str(x.get("name"))
if ip and proto and port is not None:
s.add((ip, proto, port, name))
elif isinstance(x, (list, tuple)) and len(x) == 4:
ip, proto, port, name = x
ip = _ipstr(str(ip))
proto = str(proto).lower()
port = int(port)
s.add((ip, proto, port, name))
else:
# ignore strings/unknowns
pass
except Exception:
continue
return s
def _quads_to_dicts(quads: Iterable[Tuple[str, str, int, str]]) -> List[Dict[str, Any]]:
return [{"ip": ip, "protocol": proto, "port": int(port), "name": name} for (ip, proto, port, name) in quads]
def diff_listens(netbox_items: Iterable[Any], host_items: Iterable[Any]) -> Dict[str, Any]:
"""
Accepts any JSON/YAML-safe sequences (lists of dicts/tuples).
Converts to sets internally to compute:
matched, missing_on_host, unexpected_on_host
Returns lists of dicts (JSON/YAML friendly).
"""
net_set = _to_tuple_set(netbox_items)
host_set = _to_tuple_set(host_items)
matched = sorted(list(net_set & host_set))
missing_on_host = sorted(list(net_set - host_set))
unexpected_on_host = sorted(list(host_set - net_set))
display.display(str(matched))
display.display(str(missing_on_host))
display.display(str(unexpected_on_host))
return {
"matched": _quads_to_dicts(matched),
"missing_on_host": _quads_to_dicts(missing_on_host),
"unexpected_on_host": _quads_to_dicts(unexpected_on_host),
}
class FilterModule(object):
def filters(self):
return {
"normalize_tcp_listen": normalize_tcp_listen,
"normalize_netbox_services": normalize_netbox_services,
"diff_listens": diff_listens,
}
Collection playbook
You now need a collection playbook. This connects to each host in the inventory, catalogs the ports, and then uses the
filter plugin to diff and selectively add them to netbox. Let’s call this netbox_collect.yml.
---
- name: Collect the inventory of all my assets
hosts: all
vars:
netbox_url: "{{ lookup('ansible.builtin.env', 'NETBOX_API') }}"
netbox_token: "{{ lookup('ansible.builtin.env', 'NETBOX_API_KEY') }}"
pre_tasks:
- name: Ensure network facts (for tcp_listen)
ansible.builtin.setup:
gather_subset:
- network
when: ansible_facts.tcp_listen is not defined
- name: Determine primary IP (prefer IPv4 then IPv6)
ansible.builtin.set_fact:
primary_ip: >-
{{ (ansible_facts.get('default_ipv4', {}).get('address'))
| default(ansible_facts.get('default_ipv6', {}).get('address'), true) }}
- name: Sanity check primary IP
ansible.builtin.assert:
that:
- primary_ip is string
- primary_ip | length > 0
fail_msg: "Could not determine primary IP (default_ipv4/default_ipv6)."
tasks:
- name: Gather info about listening ports
become: true
become_user: root
community.general.listen_ports_facts:
command: 'ss'
- name: Fetch NetBox services filtered by exact IP (server-side) via nb_lookup
vars:
_nb_args:
api_endpoint: "{{ netbox_url }}"
token: "{{ netbox_token }}"
validate_certs: true
# NetBox 4.3.6 supports filtering services by ipaddress
api_filter: "ipaddress={{ primary_ip }}&limit=0"
ansible.builtin.set_fact:
nb_services_raw: >-
{{ lookup('netbox.netbox.nb_lookup', 'services',
api_endpoint=_nb_args.api_endpoint,
token=_nb_args.token,
validate_certs=_nb_args.validate_certs,
api_filter=_nb_args.api_filter) }}
- name: (Resilience) If the server-side filter errored, fall back to all services (client-side filter)
block:
- name: No-op (nb_services_raw already set)
ansible.builtin.debug:
msg: "Server-side filter result length: {{ nb_services_raw | length }}"
changed_when: false
rescue:
- name: Fallback – fetch all services via nb_lookup
vars:
_nb_args_all:
api_endpoint: "{{ netbox_url }}"
token: "{{ netbox_token }}"
validate_certs: true
ansible.builtin.set_fact:
nb_services_raw: >-
{{ lookup('netbox.netbox.nb_lookup', 'services',
api_endpoint=_nb_args_all.api_endpoint,
token=_nb_args_all.token,
validate_certs=_nb_args_all.validate_certs) }}
- name: Normalize NetBox services to {(ip, 'tcp', port)} for this host
ansible.builtin.set_fact:
nb_tcp_set: "{{ nb_services_raw | normalize_netbox_services(primary_ip) }}"
- name: Build non-loopback listeners list
ansible.builtin.set_fact:
non_loopback_listeners: >-
{{
(ansible_facts.tcp_listen + ansible_facts.udp_listen)
| selectattr('address', 'defined')
| rejectattr('address', 'match', '^(127.)')
| rejectattr('address', 'match', '^(::1)')
| rejectattr('port', 'match', '^(68)')
| list
}}
- name: Normalize local tcp_listen to {(ip, 'tcp', port)} (wildcards match primary IP)
ansible.builtin.set_fact:
host_tcp_set: "{{ ansible_facts.tcp_listen | normalize_tcp_listen(primary_ip) }}"
- name: Compute diff
ansible.builtin.set_fact:
tcp_diff: "{{ nb_tcp_set | diff_listens(host_tcp_set) }}"
- name: Show nb_services
ansible.builtin.debug:
var: nb_services_raw
- name: Show tcp_services
ansible.builtin.debug:
var: host_tcp_set
- name: Show diff (structured)
ansible.builtin.debug:
var: tcp_diff
- name: Create the device that corresponds to my asset
delegate_to: localhost
netbox.netbox.netbox_device:
netbox_url: "{{ netbox_url }}"
netbox_token: "{{ netbox_token }}"
data:
name: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"
device_type: vm
device_role: vm
site: Home
register: device_lookup
- name: Create the interface that corresponds to my asset
delegate_to: localhost
netbox.netbox.netbox_device_interface:
netbox_url: "{{ netbox_url }}"
netbox_token: "{{ netbox_token }}"
data:
name : "{{ ansible_facts['default_ipv4']['interface'] }}"
type: 1000Base-T
device: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"
- name: Create or update the IP address of my asset
delegate_to: localhost
inf0junki3.inventory.netbox_ip_assign:
netbox_url: "{{ netbox_url }}"
token: "{{ netbox_token }}"
device: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"
interface: "{{ ansible_facts['default_ipv4']['interface'] }}"
address: "{{ ansible_facts['default_ipv4']['address'] }}/24"
dns_name: "{{ lookup('community.general.dig', ansible_facts['default_ipv4']['address'] + '/PTR')[:-1] }}"
status: active
- name: Catalog ports
delegate_to: localhost
ansible.builtin.uri:
url: "{{ netbox_url }}/api/ipam/services/"
method: POST
headers:
Authorization: "Token {{ netbox_token }}"
Content-Type: "application/json"
body_format: json
body:
name: "{{ item['name'] }}"
parent_object_type: "dcim.device"
parent_object_id: "{{ device_lookup.device.id }}"
ports: "[{{ item['port'] }}]"
protocol: "{{ item['protocol'] }}"
status_code:
- 200
- 201
loop: "{{ tcp_diff['unexpected_on_host'] | unique }}"
Step 2: Populating netbox
Now let’s talk about how to populate the netbox. The premise is to use an authenticated connection to connect to assets on the network and extract the services running on them.
Let’s make some assumptions here:
-
I assume that in the very least, I’m able to identify my (legitimate) assets, by looking at my DHCP server’s leases and running a host sweep on my infrastructure. You don’t necessarily want to be connecting up to assets you don’t control, especially with a user name and password - a great way to steal credentials is to pose as an SSH or SMB server and wait for a machine to authenticate to you. It’s best to use a machine that has minimal attack surface, uses a private key to authenticate, and have Agent forwardring disabled.
-
I assume that you’re going to run this baseline at least once manually. While it is tedious and boring to collect the services on each machine of an infrastructure, it’s important to actually look through the results and prioritize important assets.
With this out of the way:
- Run a host sweep of your network, e.g. with
nmap -sn -oG - 192.168.1.0/24. If you’re supremely confident about not having nasty hosts listening on your network, you can convert this to a basic inventory with:nmap -sT -p 22 192.168.1.0/24 -oG - | grep '22/open/tcp//ssh' | awk '{print $2}' > tmp_inventory - Run your
python3 -m venv venv; source venv/bin/activate; pip install -r requirements.txtcommands to get the necessary libraries. - Run
ansible-galaxy install -r requirements.ymlto install the netbox collection. - Source your
.envfile so that you have the netbox variables in your environment. - Run the playbook in check mode:
ansible-playbook netbox_collect.yml --check -i tmp_inventory - If you’re happy with the result, run the playbook in “for-keeps” mode:
ansible-playbook netbox_collect.yml --check -i tmp_inventory
Conclusion
Automating your baselining with netbox will help you have better visibility on what is running on your network. Being able to look up a running service in a vetted inventory of pre-approved services will help you write better firewall rules and ferret out services that shouldn’t be running on your infrastructure.
I’ve provided a sample repository here, which I hope will be of use, please feel free to comment or contribute: https://github.com/Inf0Junki3/automatic-baselining-example
Please note that I’ve written a fair amount of this code with the assistance of an AI tool. If for some reason this code includes code that was previously written and licensed differently than the MIT license I slapped on there, I would very much appreciate it if you let me know by popping an issue on my repo - apologies for any incidental re-use!