Manipulating and Analyzing Data

TimeSeriesDataFrame

A ts.flint.TimeSeriesDataFrame is a time-series aware version of a pyspark.sql.DataFrame. Being time-series aware, it has optimized versions of some operations like joins, and also some new features like temporal joins.

Like a normal pyspark.sql.DataFrame, a ts.flint.TimeSeriesDataFrame is a collection of pyspark.sql.Row objects, but which always must have a time column. The rows are always sorted by time, and the API affords special join/aggregation operations that take advantage of that temporal locality.

Note on Dataframes and Immutability

Note

All the methods on ts.flint.TimeSeriesDataFrame that appear to “add” something do not modify the target of the method. Instead, they return a new ts.flint.TimeSeriesDataFrame which shares most of its data with the original one. In fact, this is true of all Spark transformations.

Therefore, don’t discard the results of one of these calls, assign it to a different variable. That way, you can always go back and refer to something before you transformed it.

Bad Example:

This completely discards the results of these operations. You’ll simply get the wrong data.

>>> df.select('time', 'id', 'openPrice', 'closePrice')
>>> df.addWindows(windows.past_absolute_time('5d'), key='id')

Okay Example:

This is going to compute the right thing, but if you decide you want to try something without that window, you have to clear everything and start over. In addition, if you run a notebook cell like this multiple times, you’ll add multiple layers of the same transformations to your dataframes.

>>> df = df.select('time', 'id', 'openPrice', 'closePrice')
>>> df = df.addWindows(windows.past_absolute_time('5d'), key='id')

Good Example:

This is the best way to work with ts.flint.TimeSeriesDataFrame and pyspark.sql.DataFrame. You can run this cell any number of times and you’ll always get the same thing. Furthermore, you can now chain multiple things off price_df later, without re-reading raw_df.

>>> price_df = raw_df.select('time', 'id', 'openPrice', 'closePrice')
>>> window_df_7d = price_df.addWindows(windows.past_absolute_time('7d'), key='id')
>>> window_df_14d = price_df.addWindows(windows.past_absolute_time('14d'), key='id')

Summarizers

In Flint, we specify the summarizations we want to do in terms of answering two orthogonal questions:

  • What aggregation/summarization function do you want to apply to a given set of rows?
  • Which rows do you want to aggregate/summarize together?

The functions in ts.flint.summarizers are the way to specify what functions you want to apply. These are suitable for passing to functions like ts.flint.TimeSeriesDataFrame.summarize(), and ts.flint.TimeSeriesDataFrame.summarizeCycles(), which answer the second question, which rows should be aggregated together.

The Flint summarizer library augments the analysis capabilities of the normal pyspark.sql.DataFrame such as those available in pyspark.sql.functions.