Using ntopng as Generic Flow Collector (log files, firewall events, MQTT…)

Posted · Add Comment

Most users of our community use ntopng as flow (sFlow/NetFlow/IPFIX) collector with ntop tools such as nProbe or nProbe Cento. From time to time we receive inquiries about using it as generic flow collector for instance reading connection information from the firewall, log files/syslog, MQTT, or cloud formats. This blog post shows you howto do that,

ntopng can collect information via ZMQ, so the simplest mechanism is to export data on top of this protocol. ntopng accepts two formats implemented by the nDPI serialization library:

  • binary TLV (all versions)
  • JSON (not supported on the community edition).

Serialized data can be sent uncompressed or zlib-compressed. The data format has to be <ID> <VALUE> where the <ID> is a number that specifies an information element part of RFC 3954. Example L4_SRC_PORT is identified by ID 7, and IPV6_SRC_ADDR as 27. Please note that it is mandator to specify IP/port src/dst, protocol, bytes/packets in/out.

For instance in Python you have to write something like:

ntop_flow = {
        "1":  int(connection["bytes_sent"]),
        "2":  int(connection["packets_sent"]),
        "22": rfc3339_to_unix_epoch(connection["start_time"]),
        "21": rfc3339_to_unix_epoch(connection["end_time"]),
        "8":  connection["src_ip"],
        "7":  int(connection["src_port"]),
        "12": connection["dest_ip"],
        "11": int(connection["dest_port"]),
        "4":  int(connection["protocol"])   
    }

    ntop_flow_compressed = zlib.compress(str.encode(json.dumps(ntop_flow))
    header = build_zmq_msg_hdr("flow", sys.getsizeof(ntop_flow_compressed), msg_id, source_id)
    flags = 0
    time.sleep(0.01)
    socket.send(header, flags | zmq.SNDMORE)
    rc = socket.send(ntop_flow_compressed, flags)

that sends the flow on the topic “flow” on which ntopng is listening.

Enjoy !