SPL in Action

The Streams Processing Language (SPL) is designed to help developers create powerful real-time data processing applications with ease. Streams workflows typically involve ingesting live data streams from a variety of sources, transforming the data to prepare it for analysis, applying analytics to derive intelligence and make decisions, and sending the relevant results and artifacts through data sinks to the appropriate technologies.

Below are some samples showcasing the Streams workflow in real world applications:

Network Forecaster

This sample demonstrates how to use the AutoForecaster operator. The AutoForecaster operator enables developers to easily add forecasting analysis to their applications. This operator automatically selects the best algorithm to use for forecasting by analyzing the incoming data. In this sample, network load is analyzed using the autoforecaster and future values are forecasted. The composite reads input data from a file, increments a timestamp field, performs time-series forecasting, and writes the results to both a file and standard output.


namespace application ;
 
// Import the AutoForecaster2 operator for time-series forecasting
use com.ibm.streams.timeseries.modeling::AutoForecaster2 ;
 
/*
* Main composite AutoForecasterSamples orchestrates:
* 1. Reading the input data stream from a file
* 2. Adding a time index to each data point
* 3. Forecasting future values using AutoForecaster2
* 4. Writing/printing forecasted results
*/
composite AutoForecasterSamples
{
    graph
        /*
         * NetloadSource: Reads float64 data values (named 'inputData') from the
         * file netload.out. This operator outputs a stream<float64 inputData>.
         */
        (stream<float64 inputData> NetloadData) as NetloadSource = FileSource()
        {
            param
                // file parameter points to the input data file in the application directory
                file : getApplicationDir() + "/data/netload.out" ;
        }
 
        /*
         * AddTimeOp: Adds a monotonically increasing timestamp (t) to each incoming
         * data tuple. The 'Custom' operator uses a mutable uint64 counter 'cnt' that 
         * increments on every tuple to simulate time progression.
         * 
         * Output stream: (stream<uint64 t, float64 inputData>) NetloadTimeData
         */
        (stream<uint64 t, float64 inputData> NetloadTimeData) as AddTimeOp =
            Custom(NetloadData)
        {
            logic
                state :
                {
                    // cnt is used to generate sequential time indices
                    mutable uint64 cnt = 1l ;
                }
 
                onTuple NetloadData :
                {
                    // For each incoming data tuple, increment cnt and submit a new tuple
                    // containing the time index (t) and the original data (inputData).
                    submit({ t = cnt++, inputData = inputData }, NetloadTimeData) ;
                }
        }
 
        /*
         * ForecastingOperator: The AutoForecaster2 operator takes the time-stamped data
         * from NetloadTimeData and generates forecasted results. It requires configuration:
         *  - inputTimeSeries: the data field to forecast (inputData)
         *  - initSamples: number of initial observations for model training
         *  - stepAhead: the forecasting horizon (how far ahead to predict)
         *  - algorithm: forecasting method (Dynamic in this case)
         *  - inputTimestamp: the timestamp field (t)
         * 
         * Output stream: 
         *  - forecastedTimestamp: the timestamp for the predicted value
         *  - forecastedResult: the forecasted value
         */
        (stream<uint64 t, float64 inputData, uint64 forecastedTimestamp, float64 forecastedResult>
            ForecastedResults) as ForecastingOperator
            = AutoForecaster2(NetloadTimeData as inPort0Alias)
        {
            param
                // Identifies the numeric field in the incoming tuples to forecast
                inputTimeSeries : inputData ;
                // Number of samples to initially train the model
                initSamples : 100u ;
                // How many steps ahead to predict
                stepAhead : 20u ;
                // The type of forecasting algorithm; 'Dynamic' updates the model adaptively
                algorithm : Dynamic ;
                // Indicates which field is used as the input timestamp
                inputTimestamp : t ;
 
            output
                // Maps the operator's forecasted timestamps and results
                ForecastedResults :
                    forecastedResult = forecastedTimeSeriesStep(),
                    forecastedTimestamp = forecastedTimestamp() ;
        }
 
        /*
         * FileSinkOp: Writes the forecasted results (ForecastedResults) to an output file.
         * This allows for offline analysis or archiving of the forecasting output.
         */
        () as FileSinkOp = FileSink(ForecastedResults)
        {
            param
                // File path to store forecasted results
                file : "/tmp/result.txt" ;
        }
 
        /*
         * Printer: A Custom operator that prints each forecasted tuple to standard output.
         * Useful for observing the results in real-time or during development/debugging.
         */
        () as Printer = Custom(ForecastedResults as in0)
        {
            logic
            onTuple in0:
            {
                // Prints the full tuple (timestamp, input data, forecasted timestamp, forecasted result)
                println(in0);
            }
        }
 
} // End of composite AutoForecasterSamples