Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Note that you can leverage almost all Spark SQL APIs in PySpark. This page describes Structured Streaming specific features.

Core Classes

The classes below are core classes in Structured Streaming APIs at PySpark.

DataStreamReader(spark)

Interface used to load a streaming DataFrame from external storage systems (e.g.

DataStreamWriter(df)

Interface used to write a streaming DataFrame to external storage systems (e.g.

ForeachBatchFunction(sql_ctx, func)

This is the Python implementation of Java interface ‘ForeachBatchFunction’.

StreamingQuery(jsq)

A handle to a query that is executing continuously in the background as new data arrives.

StreamingQueryException(desc, stackTrace[, …])

Exception that stopped a StreamingQuery.

StreamingQueryManager(jsqm)

A class to manage all the StreamingQuery StreamingQueries active.

Input and Output

DataStreamReader.csv(path[, schema, sep, …])

Loads a CSV file stream and returns the result as a DataFrame.

DataStreamReader.format(source)

Specifies the input data source format.

DataStreamReader.json(path[, schema, …])

Loads a JSON file stream and returns the results as a DataFrame.

DataStreamReader.load([path, format, schema])

Loads a data stream from a data source and returns it as a DataFrame.

DataStreamReader.option(key, value)

Adds an input option for the underlying data source.

DataStreamReader.options(**options)

Adds input options for the underlying data source.

DataStreamReader.orc(path[, mergeSchema, …])

Loads a ORC file stream, returning the result as a DataFrame.

DataStreamReader.parquet(path[, …])

Loads a Parquet file stream, returning the result as a DataFrame.

DataStreamReader.schema(schema)

Specifies the input schema.

DataStreamReader.text(path[, wholetext, …])

Loads a text file stream and returns a DataFrame whose schema starts with a string column named “value”, and followed by partitioned columns if there are any.

DataStreamWriter.foreach(f)

Sets the output of the streaming query to be processed using the provided writer f.

DataStreamWriter.foreachBatch(func)

Sets the output of the streaming query to be processed using the provided function.

DataStreamWriter.format(source)

Specifies the underlying output data source.

DataStreamWriter.option(key, value)

Adds an output option for the underlying data source.

DataStreamWriter.options(**options)

Adds output options for the underlying data source.

DataStreamWriter.outputMode(outputMode)

Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.

DataStreamWriter.partitionBy(*cols)

Partitions the output by the given columns on the file system.

DataStreamWriter.queryName(queryName)

Specifies the name of the StreamingQuery that can be started with start().

DataStreamWriter.start([path, format, …])

Streams the contents of the DataFrame to a data source.

DataStreamWriter.trigger([processingTime, …])

Set the trigger for the stream query.

Query Management

StreamingQuery.awaitTermination([timeout])

Waits for the termination of this query, either by query.stop() or by an exception.

StreamingQuery.exception()

return

the StreamingQueryException if the query was terminated by an exception, or None.

StreamingQuery.explain([extended])

Prints the (logical and physical) plans to the console for debugging purpose.

StreamingQuery.id

Returns the unique id of this query that persists across restarts from checkpoint data.

StreamingQuery.isActive

Whether this streaming query is currently active or not.

StreamingQuery.lastProgress

Returns the most recent StreamingQueryProgress update of this streaming query or None if there were no progress updates

StreamingQuery.name

Returns the user-specified name of the query, or null if not specified.

StreamingQuery.processAllAvailable()

Blocks until all available data in the source has been processed and committed to the sink.

StreamingQuery.recentProgress

Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.

StreamingQuery.runId

Returns the unique id of this query that does not persist across restarts.

StreamingQuery.status

Returns the current status of the query.

StreamingQuery.stop()

Stop this streaming query.

StreamingQueryManager.active

Returns a list of active queries associated with this SQLContext

StreamingQueryManager.awaitAnyTermination([…])

Wait until any of the queries on the associated SQLContext has terminated since the creation of the context, or since resetTerminated() was called.

StreamingQueryManager.get(id)

Returns an active query from this SQLContext or throws exception if an active query with this name doesn’t exist.

StreamingQueryManager.resetTerminated()

Forget about past terminated queries so that awaitAnyTermination() can be used again to wait for new terminations.