PySpark is a set of Spark APIs in Python language. It not only offers for you to write an application with Python APIs but also provides PySpark shell so you can interactively analyze your data in a distributed environment. PySpark includes almost all Apache Spark features.
General Execution: Spark Core
Spark Core is the underlying general execution engine for the Spark platform that all other functionality is built on top of. It provides in-memory computing capabilities.
Structured Data: Spark SQL
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
Streaming Analytics: Spark Streaming
Running on top of Spark, Spark Streaming enables powerful interactive and analytical applications across both streaming and historical data, while inheriting Spark’s ease of use and fault tolerance characteristics.
Machine Learning: MLlib
Machine learning has quickly emerged as a critical piece in mining Big Data for actionable insights. Built on top of Spark, MLlib is a scalable machine learning library that delivers both high-quality algorithms (e.g., multiple iterations to increase accuracy) and blazing speed (up to 100x faster than MapReduce).
Table of contents:
Interactive Analysis with the Spark Shell
Self-contained Application
This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python.
To follow along with this guide, first, download a packaged release of Spark from the Spark website. Since we won’t be using HDFS, you can download a package for any version of Hadoop.
Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, RDDs are replaced by Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood. The RDD interface is still supported, and you can get a more detailed reference at the RDD programming guide. However, we highly recommend you to switch to use Dataset, which has better performance than RDD. See the SQL programming guide to get more information about Dataset.
Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively. It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries) or Python. Start it by running the following in the Spark directory:
./bin/pyspark
Or if PySpark is installed with pip in your current environment:
pyspark
Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Due to Python’s dynamic nature, we don’t need the Dataset to be strongly-typed in Python. As a result, all Datasets in Python are Dataset[Row], and we call it DataFrame to be consistent with the data frame concept in Pandas and R. Let’s make a new DataFrame from the text of the README file in the Spark source directory:
>>> textFile = spark.read.text("README.md")
You can get values from DataFrame directly, by calling some actions, or transform the DataFrame to get a new one. For more details, please read the _[API doc](api/python/index.html#pyspark.sql.DataFrame)_.
>>> textFile.count() # Number of rows in this DataFrame 126 >>> textFile.first() # First row in this DataFrame Row(value=u'# Apache Spark')
Now let’s transform this DataFrame to a new one. We call filter to return a new DataFrame with a subset of the lines in the file.
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
We can chain together transformations and actions:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15
Dataset actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:
>>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
This first maps a line to an integer value and aliases it as “numWords”, creating a new DataFrame. agg is called on that DataFrame to find the largest word count. The arguments to select and agg are both Column, we can use df.colName to get a column from a DataFrame. We can also import pyspark.sql.functions, which provides a lot of convenient functions to build a new Column from an old one.
One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
Here, we use the explode function in select, to transform a Dataset of lines to a Dataset of words, and then combine groupBy and count to compute the per-word counts in the file as a DataFrame of 2 columns: “word” and “count”. To collect the word counts in our shell, we can call collect:
>>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:
>>> linesWithSpark.cache() >>> linesWithSpark.count() 15 >>> linesWithSpark.count() 15
It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting bin/pyspark to a cluster, as described in the RDD programming guide.
Now we will show how to write an application using the Python API (PySpark).
If you are building a packaged PySpark application or library you can add it to your setup.py file as:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
As an example, we’ll create a simple Spark application, SimpleApp.py:
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in a text file. Note that you’ll need to replace YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala and Java examples, we use a SparkSession to create Datasets. For applications that use custom classes or third-party libraries, we can also add code dependencies to spark-submit through its –py-files argument by packaging them into a .zip file (see spark-submit –help for details). SimpleApp is simple enough that we do not need to specify any code dependencies.
We can run this application using the bin/spark-submit script:
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23
If you have PySpark pip installed into your environment (e.g., pip install pyspark), you can run your application with the regular Python interpreter or use the provided ‘spark-submit’ as you prefer.
# Use the Python interpreter to run your application $ python SimpleApp.py ...
Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility. This guide will give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.
To use Apache Arrow in PySpark, the recommended version of PyArrow should be installed. If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql]. Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel. See PyArrow installation for details.
pip install pyspark[sql]
Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call toPandas() and when creating a Spark DataFrame from a Pandas DataFrame with createDataFrame(pandas_df). To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.pyspark.enabled to true. This is disabled by default.
toPandas()
createDataFrame(pandas_df)
spark.sql.execution.arrow.pyspark.enabled
true
In addition, optimizations enabled by spark.sql.execution.arrow.pyspark.enabled could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by spark.sql.execution.arrow.pyspark.fallback.enabled.
spark.sql.execution.arrow.pyspark.fallback.enabled
import numpy as np import pandas as pd # Enable Arrow-based columnar data transfers spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # Generate a Pandas DataFrame pdf = pd.DataFrame(np.random.rand(100, 3)) # Create a Spark DataFrame from a Pandas DataFrame using Arrow df = spark.createDataFrame(pdf) # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow result_pdf = df.select("*").toPandas()
Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow, toPandas() results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data. Not all Spark data types are currently supported and an error can be raised if a column has an unsupported type, see Supported SQL Types. If an error occurs during createDataFrame(), Spark will fall back to create the DataFrame without Arrow.
createDataFrame()
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.
pandas_udf
Before Spark 3.0, Pandas UDFs used to be defined with PandasUDFType. From Spark 3.0 with Python 3.6+, you can also use Python type hints. Using Python type hints are preferred and using PandasUDFType will be deprecated in the future release.
PandasUDFType
Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of StructType. The following example shows a Pandas UDF which takes long column, string column and struct column, and outputs a struct column. It requires the function to specify the type hints of pandas.Series and pandas.DataFrame as below:
pandas.Series
pandas.DataFrame
StructType
import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("col1 string, col2 long") def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame: s3['col2'] = s1 + s2.str.len() return s3 # Create a Spark DataFrame that has three columns including a sturct column. df = spark.createDataFrame( [[1, "a string", ("a nested string",)]], "long_col long, string_col string, struct_col struct<col1:string>") df.printSchema() # root # |-- long_column: long (nullable = true) # |-- string_column: string (nullable = true) # |-- struct_column: struct (nullable = true) # | |-- col1: string (nullable = true) df.select(func("long_col", "string_col", "struct_col")).printSchema() # |-- func(long_col, string_col, struct_col): struct (nullable = true) # | |-- col1: string (nullable = true) # | |-- col2: long (nullable = true)
In the following sections, it describes the combinations of the supported type hints. For simplicity, pandas.DataFrame variant is omitted.
The type hint can be expressed as pandas.Series, … -> pandas.Series.
By using pandas_udf with the function having such type hints above, it creates a Pandas UDF where the given function takes one or more pandas.Series and outputs one pandas.Series. The output of the function should always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.
The following example shows how to create this Pandas UDF that computes the product of 2 columns.
import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType # Declare the function and create the UDF def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series: return a * b multiply = pandas_udf(multiply_func, returnType=LongType()) # The function for a pandas_udf should be able to execute with local Pandas data x = pd.Series([1, 2, 3]) print(multiply_func(x, x)) # 0 1 # 1 4 # 2 9 # dtype: int64 # Create a Spark DataFrame, 'spark' is an existing SparkSession df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDF df.select(multiply(col("x"), col("x"))).show() # +-------------------+ # |multiply_func(x, x)| # +-------------------+ # | 1| # | 4| # | 9| # +-------------------+
For detailed usage, please see `pyspark.sql.functions.pandas_udf.
`pyspark.sql.functions.pandas_udf
The type hint can be expressed as Iterator[pandas.Series] -> Iterator[pandas.Series].
Iterator[pandas.Series]
By using pandas_udf with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of pandas.Series and outputs an iterator of pandas.Series. The length of the entire output from the function should be the same length of the entire input; therefore, it can prefetch the data from the input iterator as long as the lengths are the same. In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator of Series.
It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case. The pseudocode below illustrates the example.
{% highlight python %} @pandas_udf(“long”) def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: # Do some expensive initialization with a state state = very_expensive_initialization() for x in iterator: # Use that state for whole iterator. yield calculate_with_state(x, state)
df.select(calculate(“value”)).show() {% endhighlight %}
The following example shows how to create this Pandas UDF:
from typing import Iterator import pandas as pd from pyspark.sql.functions import pandas_udf pdf = pd.DataFrame([1, 2, 3], columns=["x"]) df = spark.createDataFrame(pdf) # Declare the function and create the UDF @pandas_udf("long") def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: for x in iterator: yield x + 1 df.select(plus_one("x")).show() # +-----------+ # |plus_one(x)| # +-----------+ # | 2| # | 3| # | 4| # +-----------+
The type hint can be expressed as Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series].
Iterator[Tuple[pandas.Series, ...]]
By using pandas_udf with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of a tuple of multiple pandas.Series and outputs an iterator of pandas.Series. In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series to Iterator of Series case.
from typing import Iterator, Tuple import pandas as pd from pyspark.sql.functions import pandas_udf pdf = pd.DataFrame([1, 2, 3], columns=["x"]) df = spark.createDataFrame(pdf) # Declare the function and create the UDF @pandas_udf("long") def multiply_two_cols( iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]: for a, b in iterator: yield a * b df.select(multiply_two_cols("x", "x")).show() # +-----------------------+ # |multiply_two_cols(x, x)| # +-----------------------+ # | 1| # | 4| # | 9| # +-----------------------+
The type hint can be expressed as pandas.Series, … -> Any.
Any
By using pandas_udf with the function having such type hints above, it creates a Pandas UDF similar to PySpark’s aggregate functions. The given function takes pandas.Series and returns a scalar value. The return type should be a primitive data type, and the returned scalar can be either a python primitive type, e.g., int or float or a numpy data type, e.g., numpy.int64 or numpy.float64. Any should ideally be a specific scalar type accordingly.
int
float
numpy.int64
numpy.float64
This UDF can be also used with groupBy().agg() and `pyspark.sql.Window <api/python/pyspark.sql.html#pyspark.sql.Window>`__. It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series represents a column within the group or window.
groupBy().agg()
`pyspark.sql.Window
Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by and window operations:
import pandas as pd from pyspark.sql.functions import pandas_udf from pyspark.sql import Window df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) # Declare the function and create the UDF @pandas_udf("double") def mean_udf(v: pd.Series) -> float: return v.mean() df.select(mean_udf(df['v'])).show() # +-----------+ # |mean_udf(v)| # +-----------+ # | 4.2| # +-----------+ df.groupby("id").agg(mean_udf(df['v'])).show() # +---+-----------+ # | id|mean_udf(v)| # +---+-----------+ # | 1| 1.5| # | 2| 6.0| # +---+-----------+ w = Window \ .partitionBy('id') \ .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # +---+----+------+ # | id| v|mean_v| # +---+----+------+ # | 1| 1.0| 1.5| # | 1| 2.0| 1.5| # | 2| 3.0| 6.0| # | 2| 5.0| 6.0| # | 2|10.0| 6.0| # +---+----+------+
Pandas Function APIs can directly apply a Python native function against the whole DataFrame by using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function API behaves as a regular API under PySpark DataFrame instead of Column, and Python type hints in Pandas Functions APIs are optional and do not affect how it works internally at this moment although they might be required in the future.
DataFrame
Column
From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API, DataFrame.groupby().applyInPandas(). It is still possible to use it with PandasUDFType and DataFrame.groupby().apply() as it was; however, it is preferred to use DataFrame.groupby().applyInPandas() directly. Using PandasUDFType will be deprecated in the future.
DataFrame.groupby().applyInPandas()
DataFrame.groupby().apply()
Grouped map operations with Pandas instances are supported by DataFrame.groupby().applyInPandas() which requires a Python function that takes a pandas.DataFrame and return another pandas.DataFrame. It maps each group to each pandas.DataFrame in the Python function.
This API implements the “split-apply-combine” pattern which consists of three steps:
Split the data into groups by using DataFrame.groupBy.
DataFrame.groupBy
Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.
Combine the results into a new PySpark DataFrame.
To use groupBy().applyInPandas(), the user needs to define the following:
groupBy().applyInPandas()
A Python function that defines the computation for each group.
A StructType object or a string that defines the schema of the output PySpark DataFrame.
The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a pandas.DataFrame.
Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory.
maxRecordsPerBatch
The following example shows how to use groupby().applyInPandas() to subtract the mean from each value in the group.
groupby().applyInPandas()
df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) def subtract_mean(pdf): # pdf is a pandas.DataFrame v = pdf.v return pdf.assign(v=v - v.mean()) df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show() # +---+----+ # | id| v| # +---+----+ # | 1|-0.5| # | 1| 0.5| # | 2|-3.0| # | 2|-1.0| # | 2| 4.0| # +---+----+
For detailed usage, please see `pyspark.sql.GroupedData.applyInPandas.
`pyspark.sql.GroupedData.applyInPandas
Map operations with Pandas instances are supported by DataFrame.mapInPandas() which maps an iterator of pandas.DataFrames to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame. The functions takes and outputs an iterator of pandas.DataFrame. It can return the output of arbitrary length in contrast to some Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.
DataFrame.mapInPandas()
The following example shows how to use mapInPandas():
mapInPandas()
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df.mapInPandas(filter_func, schema=df.schema).show() # +---+---+ # | id|age| # +---+---+ # | 1| 21| # +---+---+
For detailed usage, please see `pyspark.sql.DataFrame.mapsInPandas.
`pyspark.sql.DataFrame.mapsInPandas
Co-grouped map operations with Pandas instances are supported by DataFrame.groupby().cogroup().applyInPandas() which allows two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. It consists of the following steps: * Shuffle the data such that the groups of each dataframe which share a key are cogrouped together. * Apply a function to each cogroup. The input of the function is two pandas.DataFrame (with an optional tuple representing the key). The output of the function is a pandas.DataFrame. * Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.
DataFrame.groupby().cogroup().applyInPandas()
To use groupBy().cogroup().applyInPandas(), the user needs to define the following: * A Python function that defines the computation for each cogroup. * A StructType object or a string that defines the schema of the output PySpark DataFrame.
groupBy().cogroup().applyInPandas()
Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.
The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.
groupby().cogroup().applyInPandas()
import pandas as pd df1 = spark.createDataFrame( [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)], ("time", "id", "v1")) df2 = spark.createDataFrame( [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")) def asof_join(l, r): return pd.merge_asof(l, r, on="time", by="id") df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( asof_join, schema="time int, id int, v1 double, v2 string").show() # +--------+---+---+---+ # | time| id| v1| v2| # +--------+---+---+---+ # |20000101| 1|1.0| x| # |20000102| 1|3.0| x| # |20000101| 2|2.0| y| # |20000102| 2|4.0| y| # +--------+---+---+---+
For detailed usage, please see `pyspark.sql.PandasCogroupedOps.applyInPandas().
`pyspark.sql.PandasCogroupedOps.applyInPandas()
Currently, all Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType.
MapType
ArrayType
TimestampType
Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow record batches can be adjusted by setting the conf “spark.sql.execution.arrow.maxRecordsPerBatch” to an integer that will determine the maximum number of rows for each batch. The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.
Spark internally stores timestamps as UTC values, and timestamp data that is brought in without a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp data is exported or displayed in Spark, the session time zone is used to localize the timestamp values. The session time zone is set with the configuration ‘spark.sql.session.timeZone’ and will default to the JVM system local time zone if not set. Pandas uses a datetime64 type with nanosecond resolution, datetime64[ns], with optional time zone on a per-column basis.
datetime64
datetime64[ns]
When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds and each column will be converted to the Spark session time zone then localized to that time zone, which removes the time zone and displays values as local time. This will occur when calling toPandas() or pandas_udf with timestamp columns.
When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This occurs when calling createDataFrame with a Pandas DataFrame or when returning a timestamp from a pandas_udf. These conversions are done automatically to ensure Spark will have data in the expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond values will be truncated.
createDataFrame
Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is different than a Pandas timestamp. It is recommended to use Pandas time series functionality when working with timestamps in pandas_udfs to get the best performance, see here for details.
For usage with pyspark.sql, the supported versions of Pandas is 0.24.2 and PyArrow is 0.15.1. Higher versions may be used, however, compatibility and data correctness can not be guaranteed and should be verified by the user.
Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be compatible with previous versions of Arrow <= 0.14.1. This is only necessary to do for PySpark users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following can be added to conf/spark-env.sh to use the legacy Arrow IPC format:
conf/spark-env.sh
ARROW_PRE_0_15_IPC_FORMAT=1
This will instruct PyArrow >= 0.15.0 to use the legacy IPC format with the older Arrow Java that is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as described in SPARK-29367 when running pandas_udfs or toPandas() with Arrow enabled. More information about the Arrow IPC change can be read on the Arrow 0.15.0 release blog.
Writing as JSON —————a
This page lists an overview of all public PySpark modules, classes, functions and methods.
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation.
This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
The classes below are core classes in Spark SQL APIs at PySpark.
SparkSession(sparkContext[, jsparkSession])
SparkSession
The entry point to programming Spark with the Dataset and DataFrame API.
DataFrame(jdf, sql_ctx)
A distributed collection of data grouped into named columns.
Column(jc)
A column in a DataFrame.
Row
A row in DataFrame.
GroupedData(jgd, df)
GroupedData
A set of methods for aggregations on a DataFrame, created by DataFrame.groupBy().
DataFrame.groupBy()
DataFrameNaFunctions(df)
DataFrameNaFunctions
Functionality for working with missing data in DataFrame.
DataFrameStatFunctions(df)
DataFrameStatFunctions
Functionality for statistic functions with DataFrame.
Window
Utility functions for defining window in DataFrames.
SparkSession.builder
A class attribute having a Builder to construct SparkSession instances.
Builder
SparkSession.builder.appName(name)
SparkSession.builder.appName
Sets a name for the application, which will be shown in the Spark web UI.
SparkSession.builder.config([key, value, conf])
SparkSession.builder.config
Sets a config option.
SparkSession.builder.enableHiveSupport()
SparkSession.builder.enableHiveSupport
Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive SerDes, and Hive user-defined functions.
SparkSession.builder.getOrCreate()
SparkSession.builder.getOrCreate
Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.
SparkSession.builder.master(master)
SparkSession.builder.master
Sets the Spark master URL to connect to, such as “local” to run locally, “local[4]” to run locally with 4 cores, or “spark://master:7077” to run on a Spark standalone cluster.
SparkSession.catalog
Interface through which the user may create, drop, alter or query underlying databases, tables, functions, etc.
SparkSession.conf
Runtime configuration interface for Spark.
SparkSession.createDataFrame(data[, schema, …])
SparkSession.createDataFrame
Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
RDD
SparkSession.getActiveSession()
SparkSession.getActiveSession
Returns the active SparkSession for the current thread, returned by the builder.
SparkSession.newSession()
SparkSession.newSession
Returns a new SparkSession as new session, that has separate SQLConf, registered temporary views and UDFs, but shared SparkContext and table cache.
SparkSession.range(start[, end, step, …])
SparkSession.range
Create a DataFrame with single pyspark.sql.types.LongType column named id, containing elements in a range from start to end (exclusive) with step value step.
pyspark.sql.types.LongType
id
start
end
step
SparkSession.read
Returns a DataFrameReader that can be used to read data in as a DataFrame.
DataFrameReader
SparkSession.readStream
Returns a DataStreamReader that can be used to read data streams as a streaming DataFrame.
DataStreamReader
SparkSession.sparkContext
Returns the underlying SparkContext.
SparkContext
SparkSession.sql(sqlQuery)
SparkSession.sql
Returns a DataFrame representing the result of the given query.
SparkSession.stop()
SparkSession.stop
Stop the underlying SparkContext.
SparkSession.streams
Returns a StreamingQueryManager that allows managing all the StreamingQuery instances active on this context.
StreamingQueryManager
StreamingQuery
SparkSession.table(tableName)
SparkSession.table
Returns the specified table as a DataFrame.
SparkSession.udf
Returns a UDFRegistration for UDF registration.
UDFRegistration
SparkSession.version
The version of Spark on which this application is running.
DataFrameReader.csv(path[, schema, sep, …])
DataFrameReader.csv
Loads a CSV file and returns the result as a DataFrame.
DataFrameReader.format(source)
DataFrameReader.format
Specifies the input data source format.
DataFrameReader.jdbc(url, table[, column, …])
DataFrameReader.jdbc
Construct a DataFrame representing the database table named table accessible via JDBC URL url and connection properties.
table
url
properties
DataFrameReader.json(path[, schema, …])
DataFrameReader.json
Loads JSON files and returns the results as a DataFrame.
DataFrameReader.load([path, format, schema])
DataFrameReader.load
Loads data from a data source and returns it as a DataFrame.
DataFrameReader.option(key, value)
DataFrameReader.option
Adds an input option for the underlying data source.
DataFrameReader.options(**options)
DataFrameReader.options
Adds input options for the underlying data source.
DataFrameReader.orc(path[, mergeSchema, …])
DataFrameReader.orc
Loads ORC files, returning the result as a DataFrame.
DataFrameReader.parquet(*paths, **options)
DataFrameReader.parquet
Loads Parquet files, returning the result as a DataFrame.
DataFrameReader.schema(schema)
DataFrameReader.schema
Specifies the input schema.
DataFrameReader.table(tableName)
DataFrameReader.table
DataFrameReader.text(paths[, wholetext, …])
DataFrameReader.text
Loads text files and returns a DataFrame whose schema starts with a string column named “value”, and followed by partitioned columns if there are any.
DataFrameWriter.bucketBy(numBuckets, col, *cols)
DataFrameWriter.bucketBy
Buckets the output by the given columns.If specified, the output is laid out on the file system similar to Hive’s bucketing scheme.
DataFrameWriter.csv(path[, mode, …])
DataFrameWriter.csv
Saves the content of the DataFrame in CSV format at the specified path.
DataFrameWriter.format(source)
DataFrameWriter.format
Specifies the underlying output data source.
DataFrameWriter.insertInto(tableName[, …])
DataFrameWriter.insertInto
Inserts the content of the DataFrame to the specified table.
DataFrameWriter.jdbc(url, table[, mode, …])
DataFrameWriter.jdbc
Saves the content of the DataFrame to an external database table via JDBC.
DataFrameWriter.json(path[, mode, …])
DataFrameWriter.json
Saves the content of the DataFrame in JSON format (JSON Lines text format or newline-delimited JSON) at the specified path.
DataFrameWriter.mode(saveMode)
DataFrameWriter.mode
Specifies the behavior when data or table already exists.
DataFrameWriter.option(key, value)
DataFrameWriter.option
Adds an output option for the underlying data source.
DataFrameWriter.options(**options)
DataFrameWriter.options
Adds output options for the underlying data source.
DataFrameWriter.orc(path[, mode, …])
DataFrameWriter.orc
Saves the content of the DataFrame in ORC format at the specified path.
DataFrameWriter.parquet(path[, mode, …])
DataFrameWriter.parquet
Saves the content of the DataFrame in Parquet format at the specified path.
DataFrameWriter.partitionBy(*cols)
DataFrameWriter.partitionBy
Partitions the output by the given columns on the file system.
DataFrameWriter.save([path, format, mode, …])
DataFrameWriter.save
Saves the contents of the DataFrame to a data source.
DataFrameWriter.saveAsTable(name[, format, …])
DataFrameWriter.saveAsTable
Saves the content of the DataFrame as the specified table.
DataFrameWriter.sortBy(col, *cols)
DataFrameWriter.sortBy
Sorts the output in each bucket by the given columns on the file system.
DataFrameWriter.text(path[, compression, …])
DataFrameWriter.text
Saves the content of the DataFrame in a text file at the specified path.
DataFrame.agg(*exprs)
DataFrame.agg
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
df.groupBy.agg()
DataFrame.alias(alias)
DataFrame.alias
Returns a new DataFrame with an alias set.
DataFrame.approxQuantile(col, probabilities, …)
DataFrame.approxQuantile
Calculates the approximate quantiles of numerical columns of a DataFrame.
DataFrame.cache()
DataFrame.cache
Persists the DataFrame with the default storage level (MEMORY_AND_DISK).
DataFrame.checkpoint([eager])
DataFrame.checkpoint
Returns a checkpointed version of this Dataset.
DataFrame.coalesce(numPartitions)
DataFrame.coalesce
Returns a new DataFrame that has exactly numPartitions partitions.
DataFrame.colRegex(colName)
DataFrame.colRegex
Selects column based on the column name specified as a regex and returns it as Column.
DataFrame.collect()
DataFrame.collect
Returns all the records as a list of Row.
DataFrame.columns
Returns all column names as a list.
DataFrame.corr(col1, col2[, method])
DataFrame.corr
Calculates the correlation of two columns of a DataFrame as a double value.
DataFrame.count()
DataFrame.count
Returns the number of rows in this DataFrame.
DataFrame.cov(col1, col2)
DataFrame.cov
Calculate the sample covariance for the given columns, specified by their names, as a double value.
DataFrame.createGlobalTempView(name)
DataFrame.createGlobalTempView
Creates a global temporary view with this DataFrame.
DataFrame.createOrReplaceGlobalTempView(name)
DataFrame.createOrReplaceGlobalTempView
Creates or replaces a global temporary view using the given name.
DataFrame.createOrReplaceTempView(name)
DataFrame.createOrReplaceTempView
Creates or replaces a local temporary view with this DataFrame.
DataFrame.createTempView(name)
DataFrame.createTempView
Creates a local temporary view with this DataFrame.
DataFrame.crossJoin(other)
DataFrame.crossJoin
Returns the cartesian product with another DataFrame.
DataFrame.crosstab(col1, col2)
DataFrame.crosstab
Computes a pair-wise frequency table of the given columns.
DataFrame.cube(*cols)
DataFrame.cube
Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregations on them.
DataFrame.describe(*cols)
DataFrame.describe
Computes basic statistics for numeric and string columns.
DataFrame.distinct()
DataFrame.distinct
Returns a new DataFrame containing the distinct rows in this DataFrame.
DataFrame.drop(*cols)
DataFrame.drop
Returns a new DataFrame that drops the specified column.
DataFrame.dropDuplicates([subset])
DataFrame.dropDuplicates
Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
DataFrame.drop_duplicates([subset])
DataFrame.drop_duplicates
drop_duplicates() is an alias for dropDuplicates().
drop_duplicates()
dropDuplicates()
DataFrame.dropna([how, thresh, subset])
DataFrame.dropna
Returns a new DataFrame omitting rows with null values.
DataFrame.dtypes
Returns all column names and their data types as a list.
DataFrame.exceptAll(other)
DataFrame.exceptAll
Return a new DataFrame containing rows in this DataFrame but not in another DataFrame while preserving duplicates.
DataFrame.explain([extended, mode])
DataFrame.explain
Prints the (logical and physical) plans to the console for debugging purpose.
DataFrame.fillna(value[, subset])
DataFrame.fillna
Replace null values, alias for na.fill().
na.fill()
DataFrame.filter(condition)
DataFrame.filter
Filters rows using the given condition.
DataFrame.first()
DataFrame.first
Returns the first row as a Row.
DataFrame.foreach(f)
DataFrame.foreach
Applies the f function to all Row of this DataFrame.
f
DataFrame.foreachPartition(f)
DataFrame.foreachPartition
Applies the f function to each partition of this DataFrame.
DataFrame.freqItems(cols[, support])
DataFrame.freqItems
Finding frequent items for columns, possibly with false positives.
DataFrame.groupBy(*cols)
Groups the DataFrame using the specified columns, so we can run aggregation on them.
DataFrame.groupby(*cols)
DataFrame.groupby
groupby() is an alias for groupBy().
groupby()
groupBy()
DataFrame.head([n])
DataFrame.head
Returns the first n rows.
n
DataFrame.hint(name, *parameters)
DataFrame.hint
Specifies some hint on the current DataFrame.
DataFrame.inputFiles()
DataFrame.inputFiles
Returns a best-effort snapshot of the files that compose this DataFrame.
DataFrame.intersect(other)
DataFrame.intersect
Return a new DataFrame containing rows only in both this DataFrame and another DataFrame.
DataFrame.intersectAll(other)
DataFrame.intersectAll
Return a new DataFrame containing rows in both this DataFrame and another DataFrame while preserving duplicates.
DataFrame.isLocal()
DataFrame.isLocal
Returns True if the collect() and take() methods can be run locally (without any Spark executors).
True
collect()
take()
DataFrame.isStreaming
Returns True if this Dataset contains one or more sources that continuously return data as it arrives.
Dataset
DataFrame.join(other[, on, how])
DataFrame.join
Joins with another DataFrame, using the given join expression.
DataFrame.limit(num)
DataFrame.limit
Limits the result count to the number specified.
DataFrame.localCheckpoint([eager])
DataFrame.localCheckpoint
Returns a locally checkpointed version of this Dataset.
DataFrame.mapInPandas(func, schema)
DataFrame.mapInPandas
Maps an iterator of batches in the current DataFrame using a Python native function that takes and outputs a pandas DataFrame, and returns the result as a DataFrame.
DataFrame.na
Returns a DataFrameNaFunctions for handling missing values.
DataFrame.orderBy(*cols, **kwargs)
DataFrame.orderBy
Returns a new DataFrame sorted by the specified column(s).
DataFrame.persist([storageLevel])
DataFrame.persist
Sets the storage level to persist the contents of the DataFrame across operations after the first time it is computed.
DataFrame.printSchema()
DataFrame.printSchema
Prints out the schema in the tree format.
DataFrame.randomSplit(weights[, seed])
DataFrame.randomSplit
Randomly splits this DataFrame with the provided weights.
DataFrame.rdd
Returns the content as an pyspark.RDD of Row.
pyspark.RDD
DataFrame.registerTempTable(name)
DataFrame.registerTempTable
Registers this DataFrame as a temporary table using the given name.
DataFrame.repartition(numPartitions, *cols)
DataFrame.repartition
Returns a new DataFrame partitioned by the given partitioning expressions.
DataFrame.repartitionByRange(numPartitions, …)
DataFrame.repartitionByRange
DataFrame.replace(to_replace[, value, subset])
DataFrame.replace
Returns a new DataFrame replacing a value with another value.
DataFrame.rollup(*cols)
DataFrame.rollup
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them.
DataFrame.sameSemantics(other)
DataFrame.sameSemantics
Returns True when the logical query plans inside both DataFrames are equal and therefore return same results.
DataFrame.sample([withReplacement, …])
DataFrame.sample
Returns a sampled subset of this DataFrame.
DataFrame.sampleBy(col, fractions[, seed])
DataFrame.sampleBy
Returns a stratified sample without replacement based on the fraction given on each stratum.
DataFrame.schema
Returns the schema of this DataFrame as a pyspark.sql.types.StructType.
pyspark.sql.types.StructType
DataFrame.select(*cols)
DataFrame.select
Projects a set of expressions and returns a new DataFrame.
DataFrame.selectExpr(*expr)
DataFrame.selectExpr
Projects a set of SQL expressions and returns a new DataFrame.
DataFrame.semanticHash()
DataFrame.semanticHash
Returns a hash code of the logical query plan against this DataFrame.
DataFrame.show([n, truncate, vertical])
DataFrame.show
Prints the first n rows to the console.
DataFrame.sort(*cols, **kwargs)
DataFrame.sort
DataFrame.sortWithinPartitions(*cols, **kwargs)
DataFrame.sortWithinPartitions
Returns a new DataFrame with each partition sorted by the specified column(s).
DataFrame.stat
Returns a DataFrameStatFunctions for statistic functions.
DataFrame.storageLevel
Get the DataFrame’s current storage level.
DataFrame.subtract(other)
DataFrame.subtract
Return a new DataFrame containing rows in this DataFrame but not in another DataFrame.
DataFrame.summary(*statistics)
DataFrame.summary
Computes specified statistics for numeric and string columns.
DataFrame.tail(num)
DataFrame.tail
Returns the last num rows as a list of Row.
num
list
DataFrame.take(num)
DataFrame.take
Returns the first num rows as a list of Row.
DataFrame.toDF(*cols)
DataFrame.toDF
Returns a new DataFrame that with new specified column names
DataFrame.toJSON([use_unicode])
DataFrame.toJSON
Converts a DataFrame into a RDD of string.
DataFrame.toLocalIterator([prefetchPartitions])
DataFrame.toLocalIterator
Returns an iterator that contains all of the rows in this DataFrame.
DataFrame.toPandas()
DataFrame.toPandas
Returns the contents of this DataFrame as Pandas pandas.DataFrame.
DataFrame.transform(func)
DataFrame.transform
Returns a new DataFrame.
DataFrame.union(other)
DataFrame.union
Return a new DataFrame containing union of rows in this and another DataFrame.
DataFrame.unionAll(other)
DataFrame.unionAll
DataFrame.unionByName(other)
DataFrame.unionByName
Returns a new DataFrame containing union of rows in this and another DataFrame.
DataFrame.unpersist([blocking])
DataFrame.unpersist
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.
DataFrame.where(condition)
DataFrame.where
where() is an alias for filter().
where()
filter()
DataFrame.withColumn(colName, col)
DataFrame.withColumn
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
DataFrame.withColumnRenamed(existing, new)
DataFrame.withColumnRenamed
Returns a new DataFrame by renaming an existing column.
DataFrame.withWatermark(eventTime, …)
DataFrame.withWatermark
Defines an event time watermark for this DataFrame.
DataFrame.write
Interface for saving the content of the non-streaming DataFrame out into external storage.
DataFrame.writeStream
Interface for saving the content of the streaming DataFrame out into external storage.
DataFrameNaFunctions.drop([how, thresh, subset])
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill(value[, subset])
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace(to_replace[, …])
DataFrameNaFunctions.replace
DataFrameStatFunctions.approxQuantile(col, …)
DataFrameStatFunctions.approxQuantile
DataFrameStatFunctions.corr(col1, col2[, method])
DataFrameStatFunctions.corr
DataFrameStatFunctions.cov(col1, col2)
DataFrameStatFunctions.cov
DataFrameStatFunctions.crosstab(col1, col2)
DataFrameStatFunctions.crosstab
DataFrameStatFunctions.freqItems(cols[, support])
DataFrameStatFunctions.freqItems
DataFrameStatFunctions.sampleBy(col, fractions)
DataFrameStatFunctions.sampleBy
ArrayType(elementType[, containsNull])
Array data type.
BinaryType
Binary (byte array) data type.
BooleanType
Boolean data type.
ByteType
Byte data type, i.e.
DataType
Base class for data types.
DateType
Date (datetime.date) data type.
DecimalType([precision, scale])
DecimalType
Decimal (decimal.Decimal) data type.
DoubleType
Double data type, representing double precision floats.
FloatType
Float data type, representing single precision floats.
IntegerType
Int data type, i.e.
LongType
Long data type, i.e.
MapType(keyType, valueType[, valueContainsNull])
Map data type.
NullType
Null type.
ShortType
Short data type, i.e.
StringType
String data type.
StructField(name, dataType[, nullable, metadata])
StructField
A field in StructType.
StructType([fields])
Struct type, consisting of a list of StructField.
Timestamp (datetime.datetime) data type.
abs(col)
abs
Computes the absolute value.
acos(col)
acos
inverse cosine of col, as if computed by java.lang.Math.acos()
add_months(start, months)
add_months
Returns the date that is months months after start
aggregate(col, zero, merge[, finish])
aggregate
Applies a binary operator to an initial state and all elements in the array, and reduces this to a single state.
approxCountDistinct(col[, rsd])
approxCountDistinct
Note
Deprecated in 2.1, use approx_count_distinct() instead.
approx_count_distinct()
approx_count_distinct(col[, rsd])
approx_count_distinct
Aggregate function: returns a new Column for approximate distinct count of column col.
array(*cols)
array
Creates a new array column.
array_contains(col, value)
array_contains
Collection function: returns null if the array is null, true if the array contains the given value, and false otherwise.
array_distinct(col)
array_distinct
Collection function: removes duplicate values from the array.
array_except(col1, col2)
array_except
Collection function: returns an array of the elements in col1 but not in col2, without duplicates.
array_intersect(col1, col2)
array_intersect
Collection function: returns an array of the elements in the intersection of col1 and col2, without duplicates.
array_join(col, delimiter[, null_replacement])
array_join
Concatenates the elements of column using the delimiter.
array_max(col)
array_max
Collection function: returns the maximum value of the array.
array_min(col)
array_min
Collection function: returns the minimum value of the array.
array_position(col, value)
array_position
Collection function: Locates the position of the first occurrence of the given value in the given array.
array_remove(col, element)
array_remove
Collection function: Remove all elements that equal to element from the given array.
array_repeat(col, count)
array_repeat
Collection function: creates an array containing a column repeated count times.
array_sort(col)
array_sort
Collection function: sorts the input array in ascending order.
array_union(col1, col2)
array_union
Collection function: returns an array of the elements in the union of col1 and col2, without duplicates.
arrays_overlap(a1, a2)
arrays_overlap
Collection function: returns true if the arrays contain any common non-null element; if not, returns null if both the arrays are non-empty and any of them contains a null element; returns false otherwise.
arrays_zip(*cols)
arrays_zip
Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.
asc(col)
asc
Returns a sort expression based on the ascending order of the given column name.
asc_nulls_first(col)
asc_nulls_first
Returns a sort expression based on the ascending order of the given column name, and null values return before non-null values.
asc_nulls_last(col)
asc_nulls_last
Returns a sort expression based on the ascending order of the given column name, and null values appear after non-null values.
ascii(col)
ascii
Computes the numeric value of the first character of the string column.
asin(col)
asin
inverse sine of col, as if computed by java.lang.Math.asin()
atan(col)
atan
inverse tangent of col, as if computed by java.lang.Math.atan()
atan2(col1, col2)
atan2
coordinate on y-axis
avg(col)
avg
Aggregate function: returns the average of the values in a group.
base64(col)
base64
Computes the BASE64 encoding of a binary column and returns it as a string column.
basestring
alias of builtins.str
builtins.str
bin(col)
bin
Returns the string representation of the binary value of the given column.
bitwiseNOT(col)
bitwiseNOT
Computes bitwise not.
blacklist
Built-in mutable sequence.
broadcast(df)
broadcast
Marks a DataFrame as small enough for use in broadcast joins.
bround(col[, scale])
bround
Round the given value to scale decimal places using HALF_EVEN rounding mode if scale >= 0 or at integral part when scale < 0.
cbrt(col)
cbrt
Computes the cube-root of the given value.
ceil(col)
ceil
Computes the ceiling of the given value.
coalesce(*cols)
coalesce
Returns the first column that is not null.
col(col)
col
Returns a Column based on the given column name.
collect_list(col)
collect_list
Aggregate function: returns a list of objects with duplicates.
collect_set(col)
collect_set
Aggregate function: returns a set of objects with duplicate elements eliminated.
column(col)
column
concat(*cols)
concat
Concatenates multiple input columns together into a single column.
concat_ws(sep, *cols)
concat_ws
Concatenates multiple input string columns together into a single string column, using the given separator.
conv(col, fromBase, toBase)
conv
Convert a number in a string column from one base to another.
corr(col1, col2)
corr
Returns a new Column for the Pearson Correlation Coefficient for col1 and col2.
col1
col2
cos(col)
cos
angle in radians
cosh(col)
cosh
hyperbolic angle
count(col)
count
Aggregate function: returns the number of items in a group.
countDistinct(col, *cols)
countDistinct
Returns a new Column for distinct count of col or cols.
cols
covar_pop(col1, col2)
covar_pop
Returns a new Column for the population covariance of col1 and col2.
covar_samp(col1, col2)
covar_samp
Returns a new Column for the sample covariance of col1 and col2.
crc32(col)
crc32
Calculates the cyclic redundancy check value (CRC32) of a binary column and returns the value as a bigint.
create_map(*cols)
create_map
Creates a new map column.
cume_dist()
cume_dist
Window function: returns the cumulative distribution of values within a window partition, i.e.
current_date()
current_date
Returns the current date as a DateType column.
current_timestamp()
current_timestamp
Returns the current timestamp as a TimestampType column.
date_add(start, days)
date_add
Returns the date that is days days after start
date_format(date, format)
date_format
Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument.
date_sub(start, days)
date_sub
Returns the date that is days days before start
date_trunc(format, timestamp)
date_trunc
Returns timestamp truncated to the unit specified by the format.
datediff(end, start)
datediff
Returns the number of days from start to end.
dayofmonth(col)
dayofmonth
Extract the day of the month of a given date as integer.
dayofweek(col)
dayofweek
Extract the day of the week of a given date as integer.
dayofyear(col)
dayofyear
Extract the day of the year of a given date as integer.
decode(col, charset)
decode
Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).
degrees(col)
degrees
Converts an angle measured in radians to an approximately equivalent angle measured in degrees.
dense_rank()
dense_rank
Window function: returns the rank of rows within a window partition, without any gaps.
desc(col)
desc
Returns a sort expression based on the descending order of the given column name.
desc_nulls_first(col)
desc_nulls_first
Returns a sort expression based on the descending order of the given column name, and null values appear before non-null values.
desc_nulls_last(col)
desc_nulls_last
Returns a sort expression based on the descending order of the given column name, and null values appear after non-null values
element_at(col, extraction)
element_at
Collection function: Returns element of array at given index in extraction if col is array.
encode(col, charset)
encode
Computes the first argument into a binary from a string using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).
exists(col, f)
exists
Returns whether a predicate holds for one or more elements in the array.
exp(col)
exp
Computes the exponential of the given value.
explode(col)
explode
Returns a new row for each element in the given array or map.
explode_outer(col)
explode_outer
expm1(col)
expm1
Computes the exponential of the given value minus one.
expr(str)
expr
Parses the expression string into the column that it represents
factorial(col)
factorial
Computes the factorial of the given value.
filter(col, f)
filter
Returns an array of elements for which a predicate holds in a given array.
first(col[, ignorenulls])
first
Aggregate function: returns the first value in a group.
flatten(col)
flatten
Collection function: creates a single array from an array of arrays.
floor(col)
floor
Computes the floor of the given value.
forall(col, f)
forall
Returns whether a predicate holds for every element in the array.
format_number(col, d)
format_number
Formats the number X to a format like ‘#,–#,–#.–’, rounded to d decimal places with HALF_EVEN round mode, and returns the result as a string.
format_string(format, *cols)
format_string
Formats the arguments in printf-style and returns the result as a string column.
from_csv(col, schema[, options])
from_csv
Parses a column containing a CSV string to a row with the specified schema.
from_json(col, schema[, options])
from_json
Parses a column containing a JSON string into a MapType with StringType as keys type, StructType or ArrayType with the specified schema.
from_unixtime(timestamp[, format])
from_unixtime
Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format.
from_utc_timestamp(timestamp, tz)
from_utc_timestamp
This is a common function for databases supporting TIMESTAMP WITHOUT TIMEZONE.
functools
functools.py - Tools for working with functions and callable objects
get_json_object(col, path)
get_json_object
Extracts json object from a json string based on json path specified, and returns json string of the extracted json object.
greatest(*cols)
greatest
Returns the greatest value of the list of column names, skipping null values.
grouping(col)
grouping
Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated or not, returns 1 for aggregated or 0 for not aggregated in the result set.
grouping_id(*cols)
grouping_id
Aggregate function: returns the level of grouping, equals to
hash(*cols)
hash
Calculates the hash code of given columns, and returns the result as an int column.
hex(col)
hex
Computes hex value of the given column, which could be pyspark.sql.types.StringType, pyspark.sql.types.BinaryType, pyspark.sql.types.IntegerType or pyspark.sql.types.LongType.
pyspark.sql.types.StringType
pyspark.sql.types.BinaryType
pyspark.sql.types.IntegerType
hour(col)
hour
Extract the hours of a given date as integer.
hypot(col1, col2)
hypot
Computes sqrt(a^2 + b^2) without intermediate overflow or underflow.
sqrt(a^2 + b^2)
ignore_unicode_prefix(f)
ignore_unicode_prefix
Ignore the ‘u’ prefix of string in doc tests, to make it works in both python 2 and 3
initcap(col)
initcap
Translate the first letter of each word to upper case in the sentence.
input_file_name()
input_file_name
Creates a string column for the file name of the current Spark task.
instr(str, substr)
instr
Locate the position of the first occurrence of substr column in the given string.
isnan(col)
isnan
An expression that returns true iff the column is NaN.
isnull(col)
isnull
An expression that returns true iff the column is null.
json_tuple(col, *fields)
json_tuple
Creates a new row for a json column according to the given field names.
kurtosis(col)
kurtosis
Aggregate function: returns the kurtosis of the values in a group.
lag(col[, offset, default])
lag
Window function: returns the value that is offset rows before the current row, and defaultValue if there is less than offset rows before the current row.
last(col[, ignorenulls])
last
Aggregate function: returns the last value in a group.
last_day(date)
last_day
Returns the last day of the month which the given date belongs to.
lead(col[, offset, default])
lead
Window function: returns the value that is offset rows after the current row, and defaultValue if there is less than offset rows after the current row.
least(*cols)
least
Returns the least value of the list of column names, skipping null values.
length(col)
length
Computes the character length of string data or number of bytes of binary data.
levenshtein(left, right)
levenshtein
Computes the Levenshtein distance of the two given strings.
lit(col)
lit
Creates a Column of literal value.
locate(substr, str[, pos])
locate
Locate the position of the first occurrence of substr in a string column, after position pos.
log(arg1[, arg2])
log
Returns the first argument-based logarithm of the second argument.
log10(col)
log10
Computes the logarithm of the given value in Base 10.
log1p(col)
log1p
Computes the natural logarithm of the given value plus one.
log2(col)
log2
Returns the base-2 logarithm of the argument.
lower(col)
lower
Converts a string expression to lower case.
lpad(col, len, pad)
lpad
Left-pad the string column to width len with pad.
ltrim(col)
ltrim
Trim the spaces from left end for the specified string value.
map_concat(*cols)
map_concat
Returns the union of all the given maps.
map_entries(col)
map_entries
Collection function: Returns an unordered array of all entries in the given map.
map_filter(col, f)
map_filter
Returns a map whose key-value pairs satisfy a predicate.
map_from_arrays(col1, col2)
map_from_arrays
Creates a new map from two arrays.
map_from_entries(col)
map_from_entries
Collection function: Returns a map created from the given array of entries.
map_keys(col)
map_keys
Collection function: Returns an unordered array containing the keys of the map.
map_values(col)
map_values
Collection function: Returns an unordered array containing the values of the map.
map_zip_with(col1, col2, f)
map_zip_with
Merge two given maps, key-wise into a single map using a function.
max(col)
max
Aggregate function: returns the maximum value of the expression in a group.
md5(col)
md5
Calculates the MD5 digest and returns the value as a 32 character hex string.
mean(col)
mean
min(col)
min
Aggregate function: returns the minimum value of the expression in a group.
minute(col)
minute
Extract the minutes of a given date as integer.
monotonically_increasing_id()
monotonically_increasing_id
A column that generates monotonically increasing 64-bit integers.
month(col)
month
Extract the month of a given date as integer.
months_between(date1, date2[, roundOff])
months_between
Returns number of months between dates date1 and date2.
nanvl(col1, col2)
nanvl
Returns col1 if it is not NaN, or col2 if col1 is NaN.
next_day(date, dayOfWeek)
next_day
Returns the first date which is later than the value of the date column.
ntile(n)
ntile
Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition.
overlay(src, replace, pos[, len])
overlay
Overlay the specified portion of src with replace, starting from byte position pos of src and proceeding for len bytes.
pandas_udf([f, returnType, functionType])
Creates a pandas user defined function (a.k.a.
percent_rank()
percent_rank
Window function: returns the relative rank (i.e.
percentile_approx(col, percentage[, accuracy])
percentile_approx
Returns the approximate percentile value of numeric column col at the given percentage.
posexplode(col)
posexplode
Returns a new row for each element with position in the given array or map.
posexplode_outer(col)
posexplode_outer
pow(col1, col2)
pow
Returns the value of the first argument raised to the power of the second argument.
quarter(col)
quarter
Extract the quarter of a given date as integer.
radians(col)
radians
Converts an angle measured in degrees to an approximately equivalent angle measured in radians.
rand([seed])
rand
Generates a random column with independent and identically distributed (i.i.d.) samples uniformly distributed in [0.0, 1.0).
randn([seed])
randn
Generates a column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.
rank()
rank
Window function: returns the rank of rows within a window partition.
regexp_extract(str, pattern, idx)
regexp_extract
Extract a specific group matched by a Java regex, from the specified string column.
regexp_replace(str, pattern, replacement)
regexp_replace
Replace all substrings of the specified string value that match regexp with rep.
repeat(col, n)
repeat
Repeats a string column n times, and returns it as a new string column.
reverse(col)
reverse
Collection function: returns a reversed string or an array with reverse order of elements.
rint(col)
rint
Returns the double value that is closest in value to the argument and is equal to a mathematical integer.
round(col[, scale])
round
Round the given value to scale decimal places using HALF_UP rounding mode if scale >= 0 or at integral part when scale < 0.
row_number()
row_number
Window function: returns a sequential number starting at 1 within a window partition.
rpad(col, len, pad)
rpad
Right-pad the string column to width len with pad.
rtrim(col)
rtrim
Trim the spaces from right end for the specified string value.
schema_of_csv(csv[, options])
schema_of_csv
Parses a CSV string and infers its schema in DDL format.
schema_of_json(json[, options])
schema_of_json
Parses a JSON string and infers its schema in DDL format.
second(col)
second
Extract the seconds of a given date as integer.
sequence(start, stop[, step])
sequence
Generate a sequence of integers from start to stop, incrementing by step.
sha1(col)
sha1
Returns the hex string result of SHA-1.
sha2(col, numBits)
sha2
Returns the hex string result of SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512).
shiftLeft(col, numBits)
shiftLeft
Shift the given value numBits left.
shiftRight(col, numBits)
shiftRight
(Signed) shift the given value numBits right.
shiftRightUnsigned(col, numBits)
shiftRightUnsigned
Unsigned shift the given value numBits right.
shuffle(col)
shuffle
Collection function: Generates a random permutation of the given array.
signum(col)
signum
Computes the signum of the given value.
sin(col)
sin
since(version)
since
A decorator that annotates a function to append the version of Spark the function was added.
sinh(col)
sinh
size(col)
size
Collection function: returns the length of the array or map stored in the column.
skewness(col)
skewness
Aggregate function: returns the skewness of the values in a group.
slice(x, start, length)
slice
Collection function: returns an array containing all the elements in x from index start (array indices start at 1, or from the end if start is negative) with the specified length.
sort_array(col[, asc])
sort_array
Collection function: sorts the input array in ascending or descending order according to the natural ordering of the array elements.
soundex(col)
soundex
Returns the SoundEx encoding for a string
spark_partition_id()
spark_partition_id
A column for partition ID.
split(str, pattern[, limit])
split
Splits str around matches of the given pattern.
sqrt(col)
sqrt
Computes the square root of the specified float value.
stddev(col)
stddev
Aggregate function: alias for stddev_samp.
stddev_pop(col)
stddev_pop
Aggregate function: returns population standard deviation of the expression in a group.
stddev_samp(col)
stddev_samp
Aggregate function: returns the unbiased sample standard deviation of the expression in a group.
struct(*cols)
struct
Creates a new struct column.
substring(str, pos, len)
substring
Substring starts at pos and is of length len when str is String type or returns the slice of byte array that starts at pos in byte and is of length len when str is Binary type.
substring_index(str, delim, count)
substring_index
Returns the substring from string str before count occurrences of the delimiter delim.
sum(col)
sum
Aggregate function: returns the sum of all values in the expression.
sumDistinct(col)
sumDistinct
Aggregate function: returns the sum of distinct values in the expression.
sys
This module provides access to some objects used or maintained by the interpreter and to functions that interact strongly with the interpreter.
tan(col)
tan
tanh(col)
tanh
toDegrees(col)
toDegrees
Deprecated in 2.1, use degrees() instead.
degrees()
toRadians(col)
toRadians
Deprecated in 2.1, use radians() instead.
radians()
to_csv(col[, options])
to_csv
Converts a column containing a StructType into a CSV string.
to_date(col[, format])
to_date
Converts a Column into pyspark.sql.types.DateType using the optionally specified format.
pyspark.sql.types.DateType
to_json(col[, options])
to_json
Converts a column containing a StructType, ArrayType or a MapType into a JSON string.
to_str(value)
to_str
A wrapper over str(), but converts bool values to lower case strings.
to_timestamp(col[, format])
to_timestamp
Converts a Column into pyspark.sql.types.TimestampType using the optionally specified format.
pyspark.sql.types.TimestampType
to_utc_timestamp(timestamp, tz)
to_utc_timestamp
transform(col, f)
transform
Returns an array of elements after applying a transformation to each element in the input array.
transform_keys(col, f)
transform_keys
Applies a function to every key-value pair in a map and returns a map with the results of those applications as the new keys for the pairs.
transform_values(col, f)
transform_values
Applies a function to every key-value pair in a map and returns a map with the results of those applications as the new values for the pairs.
translate(srcCol, matching, replace)
translate
A function translate any character in the srcCol by a character in matching.
trim(col)
trim
Trim the spaces from both ends for the specified string column.
trunc(date, format)
trunc
Returns date truncated to the unit specified by the format.
udf([f, returnType])
udf
Creates a user defined function (UDF).
unbase64(col)
unbase64
Decodes a BASE64 encoded string column and returns it as a binary column.
unhex(col)
unhex
Inverse of hex.
unix_timestamp([timestamp, format])
unix_timestamp
Convert time string with given pattern (‘yyyy-MM-dd HH:mm:ss’, by default) to Unix time stamp (in seconds), using the default timezone and the default locale, return null if fail.
upper(col)
upper
Converts a string expression to upper case.
var_pop(col)
var_pop
Aggregate function: returns the population variance of the values in a group.
var_samp(col)
var_samp
Aggregate function: returns the unbiased sample variance of the values in a group.
variance(col)
variance
Aggregate function: alias for var_samp.
warnings
Python part of the warnings subsystem.
weekofyear(col)
weekofyear
Extract the week number of a given date as integer.
when(condition, value)
when
Evaluates a list of conditions and returns one of multiple possible result expressions.
window(timeColumn, windowDuration[, …])
window
Bucketize rows into one or more time windows given a timestamp specifying column.
xxhash64(*cols)
xxhash64
Calculates the hash code of given columns using the 64-bit variant of the xxHash algorithm, and returns the result as a long column.
year(col)
year
Extract the year of a given date as integer.
zip_with(col1, col2, f)
zip_with
Merge two given arrays, element-wise, into a single array using a function.
from_avro(data, jsonFormatSchema[, options])
from_avro
Converts a binary column of Avro format into its corresponding catalyst value.
to_avro(data[, jsonFormatSchema])
to_avro
Converts a column into binary of avro format.
Window.currentRow
Window.orderBy(*cols)
Window.orderBy
Creates a WindowSpec with the ordering defined.
WindowSpec
Window.partitionBy(*cols)
Window.partitionBy
Creates a WindowSpec with the partitioning defined.
Window.rangeBetween(start, end)
Window.rangeBetween
Creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end (inclusive).
Window.rowsBetween(start, end)
Window.rowsBetween
Window.unboundedFollowing
Window.unboundedPreceding
GroupedData.agg(*exprs)
GroupedData.agg
Compute aggregates and returns the result as a DataFrame.
GroupedData.apply(udf)
GroupedData.apply
It is an alias of pyspark.sql.GroupedData.applyInPandas(); however, it takes a pyspark.sql.functions.pandas_udf() whereas pyspark.sql.GroupedData.applyInPandas() takes a Python native function.
pyspark.sql.GroupedData.applyInPandas()
pyspark.sql.functions.pandas_udf()
GroupedData.applyInPandas(func, schema)
GroupedData.applyInPandas
Maps each group of the current DataFrame using a pandas udf and returns the result as a DataFrame.
GroupedData.avg(*cols)
GroupedData.avg
Computes average values for each numeric columns for each group.
GroupedData.cogroup(other)
GroupedData.cogroup
Cogroups this group with another group so that we can run cogrouped operations.
GroupedData.count()
GroupedData.count
Counts the number of records for each group.
GroupedData.max(*cols)
GroupedData.max
Computes the max value for each numeric columns for each group.
GroupedData.mean(*cols)
GroupedData.mean
GroupedData.min(*cols)
GroupedData.min
Computes the min value for each numeric column for each group.
GroupedData.pivot(pivot_col[, values])
GroupedData.pivot
Pivots a column of the current DataFrame and perform the specified aggregation.
GroupedData.sum(*cols)
GroupedData.sum
Compute the sum for each numeric columns for each group.
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Note that you can leverage almost all Spark SQL APIs in PySpark. This page describes Structured Streaming specific features.
The classes below are core classes in Structured Streaming APIs at PySpark.
DataStreamReader(spark)
Interface used to load a streaming DataFrame from external storage systems (e.g.
DataStreamWriter(df)
DataStreamWriter
Interface used to write a streaming DataFrame to external storage systems (e.g.
ForeachBatchFunction(sql_ctx, func)
ForeachBatchFunction
This is the Python implementation of Java interface ‘ForeachBatchFunction’.
StreamingQuery(jsq)
A handle to a query that is executing continuously in the background as new data arrives.
StreamingQueryException(desc, stackTrace[, …])
StreamingQueryException
Exception that stopped a StreamingQuery.
StreamingQueryManager(jsqm)
A class to manage all the StreamingQuery StreamingQueries active.
DataStreamReader.csv(path[, schema, sep, …])
DataStreamReader.csv
Loads a CSV file stream and returns the result as a DataFrame.
DataStreamReader.format(source)
DataStreamReader.format
DataStreamReader.json(path[, schema, …])
DataStreamReader.json
Loads a JSON file stream and returns the results as a DataFrame.
DataStreamReader.load([path, format, schema])
DataStreamReader.load
Loads a data stream from a data source and returns it as a DataFrame.
DataStreamReader.option(key, value)
DataStreamReader.option
DataStreamReader.options(**options)
DataStreamReader.options
DataStreamReader.orc(path[, mergeSchema, …])
DataStreamReader.orc
Loads a ORC file stream, returning the result as a DataFrame.
DataStreamReader.parquet(path[, …])
DataStreamReader.parquet
Loads a Parquet file stream, returning the result as a DataFrame.
DataStreamReader.schema(schema)
DataStreamReader.schema
DataStreamReader.text(path[, wholetext, …])
DataStreamReader.text
Loads a text file stream and returns a DataFrame whose schema starts with a string column named “value”, and followed by partitioned columns if there are any.
DataStreamWriter.foreach(f)
DataStreamWriter.foreach
Sets the output of the streaming query to be processed using the provided writer f.
DataStreamWriter.foreachBatch(func)
DataStreamWriter.foreachBatch
Sets the output of the streaming query to be processed using the provided function.
DataStreamWriter.format(source)
DataStreamWriter.format
DataStreamWriter.option(key, value)
DataStreamWriter.option
DataStreamWriter.options(**options)
DataStreamWriter.options
DataStreamWriter.outputMode(outputMode)
DataStreamWriter.outputMode
Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
DataStreamWriter.partitionBy(*cols)
DataStreamWriter.partitionBy
DataStreamWriter.queryName(queryName)
DataStreamWriter.queryName
Specifies the name of the StreamingQuery that can be started with start().
start()
DataStreamWriter.start([path, format, …])
DataStreamWriter.start
Streams the contents of the DataFrame to a data source.
DataStreamWriter.trigger([processingTime, …])
DataStreamWriter.trigger
Set the trigger for the stream query.
StreamingQuery.awaitTermination([timeout])
StreamingQuery.awaitTermination
Waits for the termination of this query, either by query.stop() or by an exception.
query.stop()
StreamingQuery.exception()
StreamingQuery.exception
the StreamingQueryException if the query was terminated by an exception, or None.
StreamingQuery.explain([extended])
StreamingQuery.explain
StreamingQuery.id
Returns the unique id of this query that persists across restarts from checkpoint data.
StreamingQuery.isActive
Whether this streaming query is currently active or not.
StreamingQuery.lastProgress
Returns the most recent StreamingQueryProgress update of this streaming query or None if there were no progress updates
StreamingQueryProgress
StreamingQuery.name
Returns the user-specified name of the query, or null if not specified.
StreamingQuery.processAllAvailable()
StreamingQuery.processAllAvailable
Blocks until all available data in the source has been processed and committed to the sink.
StreamingQuery.recentProgress
Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
StreamingQuery.runId
Returns the unique id of this query that does not persist across restarts.
StreamingQuery.status
Returns the current status of the query.
StreamingQuery.stop()
StreamingQuery.stop
Stop this streaming query.
StreamingQueryManager.active
Returns a list of active queries associated with this SQLContext
StreamingQueryManager.awaitAnyTermination([…])
StreamingQueryManager.awaitAnyTermination
Wait until any of the queries on the associated SQLContext has terminated since the creation of the context, or since resetTerminated() was called.
resetTerminated()
StreamingQueryManager.get(id)
StreamingQueryManager.get
Returns an active query from this SQLContext or throws exception if an active query with this name doesn’t exist.
StreamingQueryManager.resetTerminated()
StreamingQueryManager.resetTerminated
Forget about past terminated queries so that awaitAnyTermination() can be used again to wait for new terminations.
awaitAnyTermination()
To run individual PySpark tests, you can use run-tests script under python directory. Test cases are located at tests package under each PySpark packages. Note that, if you add some changes into Scala or Python side in Apache Spark, you need to manually build Apache Spark again before running PySpark tests in order to apply the changes. Running PySpark testing script does not automatically build it.
Also, note that there is an ongoing issue to use PySpark on macOS High Serria+. OBJC_DISABLE_INITIALIZE_FORK_SAFETY should be set to YES in order to run some of tests. See PySpark issue and Python issue for more details.
OBJC_DISABLE_INITIALIZE_FORK_SAFETY
YES
To run test cases in a specific module:
$ python/run-tests --testnames pyspark.sql.tests.test_arrow
To run test cases in a specific class:
$ python/run-tests --testnames 'pyspark.sql.tests.test_arrow ArrowTests'
To run single test case in a specific class:
$ python/run-tests --testnames 'pyspark.sql.tests.test_arrow ArrowTests.test_null_conversion'
You can also run doctests in a specific module:
$ python/run-tests --testnames pyspark.sql.dataframe
Lastly, there is another script called run-tests-with-coverage in the same location, which generates coverage report for PySpark tests. It accepts same arguments with run-tests.
$ python/run-tests-with-coverage --testnames pyspark.sql.tests.test_arrow --python-executables=python ... Name Stmts Miss Branch BrPart Cover ------------------------------------------------------------------- pyspark/__init__.py 42 4 8 2 84% pyspark/_globals.py 16 3 4 2 75% ...
Generating HTML files for PySpark coverage under /…/spark/python/test_coverage/htmlcov You can check the coverage report visually by HTMLs under /…/spark/python/test_coverage/htmlcov.
Please check other available options via python/run-tests[-with-coverage] --help.
python/run-tests[-with-coverage] --help
Import a project to PyCharm: File –> Open –> path_to_project.
After that, open ~/.bash_profile and add the below lines.
~/.bash_profile
# Spark home export SPARK_HOME=/.../spark-2.4.5-bin-hadoop2.7 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
Source the ~/.bash_profile to reflect the changes.
source ~/.bash_profile
Install pyspark and pypandoc: PyCharm –> Preferences –> Project Interpreter
Go to PyCharm –> Preferences –> Project Interpreter. Click on Add Content Root. Here you need to add paths
Restart PyCharm, and then run the project. You should be able to see output.
+---------+-----+ | word|count| +---------+-----+ | ...| 5| | July| 2| | By| 1| | North,| 1| | taking| 1| | harry| 18| | #TBT| 1| | Potter:| 3| |character| 1| | 7| 2| | Phoenix| 1| | Number| 1| | day| 1| | (Video| 1| | seconds)| 1| | Hermione| 3| | Which| 1| | did| 1| | Potter| 38| |Voldemort| 1| +---------+-----+ only showing top 20 rows Process finished with exit code 0
We are happy to announce the availability of Spark 2.4.5! Visit the release notes to read about the new features, or download the release today.
To enable wide-scale community testing of the upcoming Spark 3.0 release, the Apache Spark community has posted a Spark 3.0.0 preview2 release. This preview is not a stable release in terms of either API or functionality, but it is meant to give the community early access to try the code that will become Spark 3.0. If you would like to test the release, please download it, and send feedback using either the mailing lists or JIRA. The documentation is available at the link.