FlowTuple - STARDUST


FlowTuple

See the latest slide deck about Flowtuples presented in 2021.

Overview

FlowTuple data is an aggregated representation of traffic captured at the network telescope that enables a more efficient processing and analysis for many research use cases that do not need access to the full packet contents. In addition, the FlowTuple format includes meta-data associated with the corresponding source IP address of each FlowTuple, such as IP geolocation, IP2ASN, or whether we categorize the source address as spoofed.

Definition

A STARDUST flowtuple has a slightly different definition compared to the typical flowtuple definition (i.e., the 5-tuple of source IP, dest IP, source port, dest port and transport protocol). The STARDUST flowtuple takes into account additional header fields (e.g., TCP flags) because of the characterstics of various componenets found in unsolicited one-way traffic and the most common types of analyses that research perform.

Properties

FlowTuples are stored in the Avro data format. A single flow entry is distinguished by the timestamp, source IP address, destination IP network (within a /24 subnet mask), destination port, and protocol.

Data fields in a FlowTuple record the following:

  1. uniq_<data field>: the number of unique values seen for the data field
  2. common_<data field>: an array containing any data values which occur frequently for the flow
  3. common_<data field>_freqs: an array containing the frequency at which the values shown in the preceding array occurred.
    • A note about required frequencies: To be considered a “frequently occurring” value, the value must be seen in at least 20% of packets that are matched to a given flow. For flows where the packet count is low, the ratio is increased to compensate for the low sample size.
    • Total Flow Packets Minimum Ratio
      1 - 4 1.0
      5 - 6 0.5
      7 - 14 0.33
      15+ 0.2
Contents of a FlowTuple v4 Record
Field Name Type Role Notes
time long Key Timestamp of the interval that this flowtuple belongs to
src_ip long Key Source IP address, as an integer
dst_net long Key The destination IP network, as an integer
dst_port long Key The destination port for TCP and UDP flows, (type « 8) + code for ICMP flows
protocol int Key The transport protocol used, e.g. 6 = TCP, 17 = UDP
packet_cnt long Counter Number of packets seen in this interval that match this flow description
uniq_dst_ips long Counter Number of unique destination IPs seen for this flow
uniq_pkt_sizes long Counter Number of unique packet sizes seen for this flow
uniq_ttls long Counter Number of unique IP TTLs seen for this flow
uniq_src_ports long Counter Number of unique source ports seen for this flow (TCP and UDP only)
uniq_tcp_flags long Counter Number of unique TCP flag combinations seen for this flow (TCP only)
first_syn_length int First Only applies to TCP flows; the size of the TCP header (e.g. doff * 5) for the first observed packet
first_tcp_rwin int First Only applies to TCP flows; the receive window announced in the first observed TCP SYN packet
common_pktsizes array(long) Observed Values Array containing packet sizes that were frequently observed for this flow
common_pktsize_freqs array(long) Frequencies Array containing frequencies for packet sizes listed in common_pktsizes array
common_ttls array(long) Observed Values Array containing IP TTLs that were frequently observed for this flow
common_ttl_freqs array(long) Frequencies Array containing frequencies for IP TTLs listed in common_ttls array
common_srcports array(long) Observed Values Array containing TCP/UDP source ports that were frequently observed for this flow
common_srcport_freqs array(long) Frequencies Array containing frequencies for IP TTLs listed in common_srcports array
common_tcpflags array(long) Observed Values Array containing TCP flag combinations that were frequently observed for this flow
common_tcpflag_freqs array(long) Frequencies Array containing frequencies for TCP flags listed in common_tcpflags array
maxmind_continent string Derived from Source IP Geo-location of the source IP address, according to Maxmind (continent level)
maxmind_country string Derived from Source IP Geo-location of the source IP address, according to Maxmind (country level)
netacq_continent string Derived from Source IP Geo-location of the source IP address, according to Netacq-Edge (continent level)
netacq_country string Derived from Source IP Geo-location of the source IP address, according to Netacq-Edge (country level)
prefix2asn long Derived from Source IP ASN that the source IP address belongs to, according to the prefix2asn dataset
spoofed_packet_cnt long Counter Number of packets where the source IP address was inferred to be spoofed
masscan_packet_cnt long Counter Number of packets that were inferred to be sent by the masscan tool

User Guide

FlowTuple data is stored in Openstack Swift using the Apache Avro data format. Specifically, data for each year is saved in a separate Swift container:

  • telescope-ucsdnt-avro-flowtuple-v4-2021
  • telescope-ucsdnt-avro-flowtuple-v4-2020
  • telescope-ucsdnt-avro-flowtuple-v4-...

Users can read the Avro files directly using existing Avro-compatible tools or libraries, if desired, but the best way to work with the flowtuple data is by writing Python scripts using PyAvro-STARDUST or the Pyspark STARDUST API.

Note: Flowtuple File Name Formats

  • August 2008 to July 2020 : v2_5min_<timestamp>.ft4.avro
  • August 2020 to Aug 2021 : v3_5min_<timestamp>.ft4.avro
  • September 2021 to Present : ucsd-nt.<timestamp>.flowtuple-v4.avro

Processing FlowTuples with PyAvro-STARDUST

You can write Python analysis code that will process the flowtuple Avro files in the STARDUST object store directly using the PyAvro-STARDUST module. This module provides a simple interface that aims to be faster and easier to use than existing Python Avro libraries (such as fastavro).

Documentation on how to install and use PyAvro-STARDUST can be found in the GitHub repository.

An example script for processing flowtuple v4 data can be found here.

Processing FlowTuples with the STARDUST Pyspark API

For larger scale processing, we have developed an API for writing Python scripts that can run flowtuple analysis jobs on an Apache Spark cluster. The API allows you to specify a time period that you are interested in, then run methods that can perform common filtering or data processing tasks (e.g. filtering flowtuples by source address prefix, or finding the top 10 more frequent values for a metric.

You can also run raw Spark SQL queries directly against a set of flowtuples, if the exact analysis that you want to perform is not implemented as an API method.

This API is still a work in progress, but our implementation thus far is available here.

Some example flowtuple analysis code written using the STARDUST Pyspark API is given below:

from stardust import StardustPysparkHelper

# creating an instance of a helper
sd = StardustPysparkHelper("telescope-ucsdnt-avro-flowtuple-v4-2021",
        "ucsd-nt", "v3_5min")

# starting a spark session with the name "test", using 4 partitions (CPUs)
sd.startSparkSession("test", 4)

# getting flowtuple records from a specific time range
range_recs = sd.getFlowtuplesByTimeRange(1612130100, 1612133700)


# filtering a set of flowtuples based on a source IP prefix
# NOTE: the prefix must be a unicode string
prefix_df = sd.filterFlowtuplesByPrefix(lastday_recs, u"1.0.0.0/8")

# get the number of flows that matched our filter prefix
print(prefix_df.count())

# print the first 20 flows that matched our query
# ALWAYS use collect() to convert a data frame into a list
# of rows if you want to inspect the results
results = prefix_df.limit(20).collect()
for r in results:
# this prints the raw row object, which is fine for debugging but
# you'll want to do some proper formatting for usable output
    print(r)
print()


# finding flowtuples that match a set of filtering criteria
# base will contain flowtuples with a Russian source IP,
# sect will contain flowtuples with a Russian source IP that
# use multiple TTLs.
sect, base = sd.createFlowtupleIntersection(range_recs,
        ["netacq_country == 'RU'", "uniq_ttls > 1"])

# generate aggregated report-style time series data for the intersection
# we just generated
sect_report = sd.generateReportOutputFromFlowtuples(sect, "example",
        "ftintersect", "RU_multiTTL")

# again, use collect() to convert our dataframe in rows that we can
# write as output
sect_results = sect_report.collect()
base_results = base_report.collect()

# dump the time series to stdout as pairs of Row objects
# this is a bit lazy -- if you were doing this for real,
# you would want better error checking and output formatting
for i in range(0, sect_report.count()):
    print(sect_results[i], base_results[i])


# get all flowtuples where the TTL falls between 20 and 30 OR 70 and 80
q_res = sd.filterFlowtuplesByCommonValue(range_recs, "common_ttls",
        [(20, 30), (70, 80)])

# show the top 10 destination ports (by flow count) for Russian source IPs
# setting the last parameter to True will also include an "Other" category
topn = sd.getTopValuesByFlowCount(base, "netacq_country", 10, True)
for k,v in topn.items():
    print(v)

# there are also other handy methods, see the module documentation
# (can be accessed using help())
Published