Database design for Web-scale ad hoc queries

Jet engineIn a previous post, Beyond Hadoop, we looked at how the MapReduce approach to database design runs up against performance limits when queries can’t be defined in advance, and we introduced Google’s response to those limits: Dremel/BigQuery. With Dremel, Google pointed the way toward an architecture that enables a database to execute exceedingly fast ad-hoc queries over large datasets using an ANSI SQL query language. Here at Kentik, we’ve applied many of the same concepts to Kentik Data Engine™ (KDE), a datastore optimized for querying IP flow records (NetFlow v5/9, sFlow, IPFIX) and related network data (GeoIP, BGP, SNMP). In this series, we’ll take a tour of KDE and also quantify some of its performance and scale characteristics.

KDE is the backend of Kentik Detect™ and as such enables users to query network data and view visualizations via the Kentik portal, a fast, intuitive UI. But KDE is also exposed to users via REST API (see Kentik APIs) and direct SQL (see Connecting to KDE). For this discussion we’ll access the backend directly so that we can get an “under the hood” look at the response times and how the queries are structured.

KDE records structure

First, let’s look at the structure of the records in the datastore. We can grab some individual flows from the last 5 seconds (or any time range), one row per flow record. Note that in this and all following examples, all identifying information (IP addresses, router names, AS names and numbers, subnets, etc.) has been anonymized:

SELECT protocol AS proto,
   ipv4_src_addr AS src_addr,
   l4_src_port AS sport,
   ipv4_dst_addr AS dst_addr,
   l4_dst_port AS dport,
   in_bytes AS bytes,
   in_pkts AS pkts
FROM big_backbone_router
WHERE i_start_time > Now() – interval ‘5 seconds’
LIMIT 5;
|   proto | src_addr       |   sport | dst_addr      |   dport |   bytes |   pkts |
|      17 | 10.235.226.99  |   26085 | 10.20.177.155 |   20815 |    2576 |      2 |
|       6 | 10.217.129.102 |   51130 | 10.16.54.83   |      80 |      52 |      1 |
|       6 | 10.93.39.104   |    9482 | 10.17.123.38  |   40558 |      52 |      1 |
|       6 | 10.246.217.104 |   61815 | 10.18.213.199 |    9050 |      52 |      1 |
|       6 | 10.246.217.104 |   45063 | 10.21.80.86   |    9050 |      52 |      1 |
SELECT 5
Time: 0.438s

The example above shows just a small subset of the available columns (see Main Table Schema for the full list). The bulk of the queryable columns are related to flow fields, but we also include many additional columns derived by correlating flow records with other data sources —BGP, GeoIP, SNMP — as the flows are ingested into KDE. KDE makes new data available immediately; flow records that were received less than 5 seconds ago are already available to query.

How big is big?

Next, let’s look at capacity: how big is our “big data”? To find out, we can query KDE to see how many flows we collected over the last week from a real carrier backbone device — we’ll call it “big backbone router” — that pushes ~250 Gbps:

SELECT Sum(protocol) AS f_count_protocol
FROM big_backbone_router
WHERE i_fast_dataset = FALSE
  AND i_start_time > Now() – interval ‘1 week’;
|   f_count_protocol |
|         1764095962 |
SELECT 1
Time: 4.726s

The answer (1,764,095,962) shows that KDE is able to query nearly 1.8 billion rows and still respond in under five seconds. How? In part it’s because the query was split into 10,800 subqueries, each representing a one-minute “slice” (see Subqueries, Slices, and Shards) of the one week time range. These subqueries were executed in parallel over many nodes, after which the results were summed into a single response.

You’ll notice that we didn’t use the standard COUNT(*) syntax. That’s because the distributed nature of KDE requires some special handling for aggregation. We’ve overloaded the “AS” keyword to allow us to pass in Kentik-specific aggregation functions (which always start with “f_”). You can read more about that in Subquery Function Syntax. In this example we chose to count the “protocol” column because it’s the narrowest field (1 byte) and provides the best performance for COUNT().

Caching for faster results

Another aspect of KDE is that since historical data doesn’t change after it’s ingested, we’re able to take advantage of caching subquery results. To see how this helps, we can re-run the same query a few minutes later. This time the query executes in less than half a second, approximately ten times faster than the first time:

SELECT Sum(protocol) AS f_count_protocol
FROM big_backbone_router
WHERE i_fast_dataset = FALSE
  AND i_start_time > Now() – interval ‘1 week’;
|   f_count_protocol |
|         1763974750 |
SELECT 1
Time: 0.420s

The reason KDE is so much faster when re-running queries is that after the initial run it only has to run the subqueries for the one-minute slices that occurred since the prior run, which are then appended to the earlier, cached subquery results. This is especially useful for time-series queries that you might run on a regular basis (we’ll see those a bit later). Just to be clear, though, subquery result caching is not the primary driver of fast query times. For the rest of the examples in this post we’ll be looking at uncached results to get an accurate characterization of “first run” response times.

Counting bytes and packets

So far we’ve been looking at flow counts, which are important but only part of the story. What about counting bytes and packets? As shown in the following query (same router, same timespan), it’s equally straightforward to get those totals (for those of you who are keeping track, it comes to ~20 petabytes and ~23 trillion packets):

SELECT Sum(both_bytes) AS f_sum_both_bytes,
   Sum(both_pkts) AS f_sum_both_pkts
FROM big_backbone_router
WHERE i_fast_dataset = FALSE
  AND i_start_time > Now() – interval ‘1 week’;
| f_sum_both_bytes   | f_sum_both_pkts   |
| 19996332614707365  | 23109672039934    |
SELECT 1
Time: 68.625s

As most readers will already know, routers typically employ sampling to generate flow data; the flow records are based on 1-in-N packets sampled off the wire. KDE stores the sampling rate for each flow as a separate column, and the aggregation functions automatically normalize the byte and packet counts such that they accurately represent what was actually sampled, rather than just a simple sum of values from the samples themselves.

Dataseries: Full or Fast?

Gathering and aggregating the data for a bytes/packets count is clearly more labor-intensive than counting flows, and that’s reflected in the response time of the query above. While 68 seconds is amazingly fast for a full table scan over 1.8 billion rows, it may not be quite as fast as we’d want for an interactive web GUI. KDE addresses this by creating at ingest a dataseries at each of two resolutions: One is the “Full” dataseries that we’ve been looking at (every flow record sent by the device). The other is what we call the “Fast” dataseries, which is a subsampled representation of the received data that retains all of the dimensionality but with fewer rows (see Resolution Overview for details).

When we run our previous query on the Fast dataseries the results return in less than one second and are only a hair’s breadth different from the Full dataseries results above — certainly accurate enough to provide the insights we need for any real-world use case:

SELECT Sum(both_bytes) AS f_sum_both_bytes,
   Sum(both_pkts) AS f_sum_both_pkts
FROM big_backbone_router
WHERE i_start_time > Now() – interval ‘1 week’;
| f_sum_both_bytes | f_sum_both_pkts |
| 19949403395561758 | 23097734283381 |
SELECT 1
Time: 0.782s

By default, KDE auto-selects the Full or Fast dataseries depending on the timespan of the query. A typical workflow might start with a wide query (days to weeks), which defaults to the Fast dataseries, and progress to zooming in on a narrower time range (hours), which defaults to the Full dataseries. The user can override these defaults, both in the GUI and in SQL (see the “i_fast_dataset” parameter in our first bytes/packets example above). In this way, KDE strikes a great balance between query timespan, granularity, and response time, while still giving the user full control over those tradeoffs.

So far we’ve seen that KDE leverages a combination of techniques — rich schema, time-slice subqueries, results caching, parallel dataseries — to achieve extraordinarily fast query performance even over huge volumes of data. In part 2 of this series, we’ll continue our tour with a look at KDE’s extensive filtering and aggregation capabilities, and demonstrate how to drill down to timespans as fine as one minute that occurred as far back as 90 days. In the meantime there’s lots you can do to find out more about Kentik Detect: request a demo, sign up for a free trial, or simply contact us for further information. And if you find these kinds of discussions fascinating, bear in mind that we’re hiring.