pyspark.sql module

Module Context

public classes of Spark SQL:

  • SQLContext Main entry point for SQL functionality.
  • DataFrame A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In addition to normal RDD operations, DataFrames also support SQL.
  • GroupedData
  • Column Column is a DataFrame with a single column.
  • Row A Row of data returned by a Spark SQL query.
  • HiveContext Main entry point for accessing data stored in Apache Hive..
class pyspark.sql.SQLContext(sparkContext, sqlContext=None)

Main entry point for Spark SQL functionality.

A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.

applySchema(rdd, schema)

Applies the given schema to the given RDD of tuple or list.

::note:
Deprecated in 1.3, use createDataFrame() instead

These tuples or lists can contain complex nested structures like lists, maps or nested rows.

The schema should be a StructType.

It is important that the schema matches the types of the objects in each row or exceptions could be thrown at runtime.

>>> from pyspark.sql.types import *
>>> rdd2 = sc.parallelize([(1, "row1"), (2, "row2"), (3, "row3")])
>>> schema = StructType([StructField("field1", IntegerType(), False),
...     StructField("field2", StringType(), False)])
>>> df = sqlCtx.applySchema(rdd2, schema)
>>> df.collect()
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
cacheTable(tableName)

Caches the specified table in-memory.

clearCache()

Removes all cached tables from the in-memory cache.

createDataFrame(data, schema=None, samplingRatio=None)

Create a DataFrame from an RDD of tuple/list, list or pandas.DataFrame.

schema could be StructType or a list of column names.

When schema is a list of column names, the type of each column will be inferred from rdd.

When schema is None, it will try to infer the column name and type from rdd, which should be an RDD of Row, or namedtuple, or dict.

If referring needed, samplingRatio is used to determined how many rows will be used to do referring. The first row will be used if samplingRatio is None.

Parameters:
  • data – an RDD of Row/tuple/list/dict, list, or pandas.DataFrame
  • schema – a StructType or list of names of columns
  • samplingRatio – the sample ratio of rows used for inferring
Returns:

a DataFrame

>>> l = [('Alice', 1)]
>>> sqlCtx.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> sqlCtx.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
>>> d = [{'name': 'Alice', 'age': 1}]
>>> sqlCtx.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
>>> rdd = sc.parallelize(l)
>>> sqlCtx.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = sqlCtx.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = sqlCtx.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
...    StructField("name", StringType(), True),
...    StructField("age", IntegerType(), True)])
>>> df3 = sqlCtx.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]
>>> sqlCtx.createDataFrame(df.toPandas()).collect()  
[Row(name=u'Alice', age=1)]
createExternalTable(tableName, path=None, source=None, schema=None, **options)

Creates an external table based on the dataset in a data source.

It returns the DataFrame associated with the external table.

The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.

Optionally, a schema can be provided as the schema of the returned DataFrame and created external table.

getConf(key, defaultValue)

Returns the value of Spark SQL configuration property for the given key.

If the key is not set, returns defaultValue.

inferSchema(rdd, samplingRatio=None)

Infer and apply a schema to an RDD of Row.

::note:
Deprecated in 1.3, use createDataFrame() instead

When samplingRatio is specified, the schema is inferred by looking at the types of each row in the sampled dataset. Otherwise, the first 100 rows of the RDD are inspected. Nested collections are supported, which can include array, dict, list, Row, tuple, namedtuple, or object.

Each row could be pyspark.sql.Row object or namedtuple or objects. Using top level dicts is deprecated, as dict is used to represent Maps.

If a single column has multiple distinct inferred types, it may cause runtime exceptions.

>>> rdd = sc.parallelize(
...     [Row(field1=1, field2="row1"),
...      Row(field1=2, field2="row2"),
...      Row(field1=3, field2="row3")])
>>> df = sqlCtx.inferSchema(rdd)
>>> df.collect()[0]
Row(field1=1, field2=u'row1')
jsonFile(path, schema=None, samplingRatio=1.0)

Loads a text file storing one JSON object per line as a DataFrame.

If the schema is provided, applies the given schema to this JSON dataset.

Otherwise, it samples the dataset with ratio samplingRatio to determine the schema.

>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> with open(jsonFile, 'w') as f:
...     f.writelines(jsonStrings)
>>> df1 = sqlCtx.jsonFile(jsonFile)
>>> df1.printSchema()
root
 |-- field1: long (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: struct (nullable = true)
 |    |-- field4: long (nullable = true)
>>> from pyspark.sql.types import *
>>> schema = StructType([
...     StructField("field2", StringType()),
...     StructField("field3",
...         StructType([StructField("field5", ArrayType(IntegerType()))]))])
>>> df2 = sqlCtx.jsonFile(jsonFile, schema)
>>> df2.printSchema()
root
 |-- field2: string (nullable = true)
 |-- field3: struct (nullable = true)
 |    |-- field5: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
jsonRDD(rdd, schema=None, samplingRatio=1.0)

Loads an RDD storing one JSON object per string as a DataFrame.

If the schema is provided, applies the given schema to this JSON dataset.

Otherwise, it samples the dataset with ratio samplingRatio to determine the schema.

>>> df1 = sqlCtx.jsonRDD(json)
>>> df1.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
>>> df2 = sqlCtx.jsonRDD(json, df1.schema)
>>> df2.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
>>> from pyspark.sql.types import *
>>> schema = StructType([
...     StructField("field2", StringType()),
...     StructField("field3",
...                 StructType([StructField("field5", ArrayType(IntegerType()))]))
... ])
>>> df3 = sqlCtx.jsonRDD(json, schema)
>>> df3.first()
Row(field2=u'row1', field3=Row(field5=None))
load(path=None, source=None, schema=None, **options)

Returns the dataset in a data source as a DataFrame.

The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.

Optionally, a schema can be provided as the schema of the returned DataFrame.

parquetFile(*paths)

Loads a Parquet file, returning the result as a DataFrame.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(df.collect()) == sorted(df2.collect())
True
registerDataFrameAsTable(rdd, tableName)

Registers the given RDD as a temporary table in the catalog.

Temporary tables exist only during the lifetime of this instance of SQLContext.

>>> sqlCtx.registerDataFrameAsTable(df, "table1")
registerFunction(name, f, returnType=StringType)

Registers a lambda function as a UDF so it can be used in SQL statements.

In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type.

>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]
>>> from pyspark.sql.types import IntegerType
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
[Row(c0=4)]
setConf(key, value)

Sets the given Spark SQL configuration property.

sql(sqlQuery)

Return a DataFrame representing the result of the given query.

>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
table(tableName)

Returns the specified table as a DataFrame.

>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
tableNames(dbName=None)

Returns a list of names of tables in the database dbName.

If dbName is not specified, the current database will be used.

>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlCtx.tableNames()
True
>>> "table1" in sqlCtx.tableNames("db")
True
tables(dbName=None)

Returns a DataFrame containing names of tables in the given database.

If dbName is not specified, the current database will be used.

The returned DataFrame has two columns, tableName and isTemporary (a column with BooleanType indicating if a table is a temporary one or not).

>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)
uncacheTable(tableName)

Removes the specified table from the in-memory cache.

class pyspark.sql.HiveContext(sparkContext, hiveContext=None)

A variant of Spark SQL that integrates with data stored in Hive.

Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL and HiveQL commands.

class pyspark.sql.DataFrame(jdf, sql_ctx)

A collection of rows that have the same columns.

A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SQLContext:

people = sqlContext.parquetFile("...")

Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame, Column.

To select a column from the data frame, use the apply method:

ageCol = people.age

Note that the Column type can also be manipulated through its various functions:

# The following creates a new column that increases everybody's age by 10.
people.age + 10

A more concrete example:

# To create DataFrame using SQLContext
people = sqlContext.parquetFile("...")
department = sqlContext.parquetFile("...")

people.filter(people.age > 30).join(department, people.deptId == department.id))           .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
agg(*exprs)

Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).

>>> df.agg({"age": "max"}).collect()
[Row(MAX(age#0)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(MIN(age#0)=2)]
cache()

Persist with the default storage level (MEMORY_ONLY_SER).

collect()

Return a list that contains all of the rows.

Each object in the list is a Row, the fields can be accessed as attributes.

>>> df.collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
columns

Return all column names as a list.

>>> df.columns
[u'age', u'name']
count()

Return the number of elements in this RDD.

Unlike the base RDD implementation of count, this implementation leverages the query optimizer to compute the count on the DataFrame, which supports features such as filter pushdown.

>>> df.count()
2L
distinct()

Return a new DataFrame containing the distinct rows in this DataFrame.

>>> df.distinct().count()
2L
dtypes

Return all column names and their data types as a list.

>>> df.dtypes
[('age', 'int'), ('name', 'string')]
explain(extended=False)

Prints the plans (logical and physical) to the console for debugging purpose.

If extended is False, only prints the physical plan.

>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:...
>>> df.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
== RDD ==
filter(condition)

Filtering rows using the given condition, which could be Column expression or string of SQL expression.

where() is an alias for filter().

>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
first()

Return the first row.

>>> df.first()
Row(age=2, name=u'Alice')
flatMap(f)

Return a new RDD by first applying a function to all elements of this, and then flattening the results.

It’s a shorthand for df.rdd.flatMap()

>>> df.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
foreach(f)

Applies a function to all rows of this DataFrame.

It’s a shorthand for df.rdd.foreach()

>>> def f(person):
...     print person.name
>>> df.foreach(f)
foreachPartition(f)

Applies a function to each partition of this DataFrame.

It’s a shorthand for df.rdd.foreachPartition()

>>> def f(people):
...     for person in people:
...         print person.name
>>> df.foreachPartition(f)
groupBy(*cols)

Group the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

>>> df.groupBy().avg().collect()
[Row(AVG(age#0)=3.5)]
>>> df.groupBy('name').agg({'age': 'mean'}).collect()
[Row(name=u'Bob', AVG(age#0)=5.0), Row(name=u'Alice', AVG(age#0)=2.0)]
>>> df.groupBy(df.name).avg().collect()
[Row(name=u'Bob', AVG(age#0)=5.0), Row(name=u'Alice', AVG(age#0)=2.0)]
head(n=None)

Return the first n rows or the first row if n is None.

>>> df.head()
Row(age=2, name=u'Alice')
>>> df.head(1)
[Row(age=2, name=u'Alice')]
insertInto(tableName, overwrite=False)

Inserts the contents of this DataFrame into the specified table.

Optionally overwriting any existing data.

intersect(other)

Return a new DataFrame containing rows only in both this frame and another frame.

This is equivalent to INTERSECT in SQL.

isLocal()

Returns True if the collect and take methods can be run locally (without any Spark executors).

join(other, joinExprs=None, joinType=None)

Join with another DataFrame, using the given join expression. The following performs a full outer join between df1 and df2.

Parameters:
  • other – Right side of the join
  • joinExprs – Join expression
  • joinType – One of inner, outer, left_outer, right_outer, semijoin.
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
limit(num)

Limit the result count to the number specified.

>>> df.limit(1).collect()
[Row(age=2, name=u'Alice')]
>>> df.limit(0).collect()
[]
map(f)

Return a new RDD by applying a function to each Row

It’s a shorthand for df.rdd.map()

>>> df.map(lambda p: p.name).collect()
[u'Alice', u'Bob']
mapPartitions(f, preservesPartitioning=False)

Return a new RDD by applying a function to each partition.

It’s a shorthand for df.rdd.mapPartitions()

>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
>>> rdd.mapPartitions(f).sum()
4
orderBy(*cols)

Return a new DataFrame sorted by the specified column(s).

Parameters:cols – The columns or expressions used for sorting
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
persist(storageLevel=StorageLevel(False, True, False, False, 1))

Set the storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY_SER).

printSchema()

Prints out the schema in the tree format.

>>> df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
rdd

Return the content of the DataFrame as an RDD of Row s.

registerAsTable(name)

DEPRECATED: use registerTempTable() instead

registerTempTable(name)

Registers this RDD as a temporary table using the given name.

The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame.

>>> df.registerTempTable("people")
>>> df2 = sqlCtx.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
repartition(numPartitions)

Return a new DataFrame that has exactly numPartitions partitions.

>>> df.repartition(10).rdd.getNumPartitions()
10
sample(withReplacement, fraction, seed=None)

Return a sampled subset of this DataFrame.

>>> df.sample(False, 0.5, 97).count()
1L
save(path=None, source=None, mode='append', **options)

Saves the contents of the DataFrame to a data source.

The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.

Additionally, mode is used to specify the behavior of the save operation when data already exists in the data source. There are four modes:

  • append: Contents of this DataFrame are expected to be appended to existing data.
  • overwrite: Existing data is expected to be overwritten by the contents of this DataFrame.
  • error: An exception is expected to be thrown.
  • ignore: The save operation is expected to not save the contents of the DataFrame and to not change the existing data.
saveAsParquetFile(path)

Save the contents as a Parquet file, preserving the schema.

Files that are written out using this method can be read back in as a DataFrame using the SQLContext.parquetFile method.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(df2.collect()) == sorted(df.collect())
True
saveAsTable(tableName, source=None, mode='append', **options)

Saves the contents of the DataFrame to a data source as a table.

The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.

Additionally, mode is used to specify the behavior of the saveAsTable operation when table already exists in the data source. There are four modes:

  • append: Contents of this DataFrame are expected to be appended to existing table.
  • overwrite: Data in the existing table is expected to be overwritten by the contents of this DataFrame.
  • error: An exception is expected to be thrown.
  • ignore: The save operation is expected to not save the contents of the DataFrame and to not change the existing table.
schema

Returns the schema of this DataFrame (represented by a StructType).

>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
select(*cols)

Selecting a set of expressions.

>>> df.select('*').collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.select('name', 'age').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
selectExpr(*expr)

Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.

>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
show(n=20)

Print the first n rows.

>>> df
DataFrame[age: int, name: string]
>>> df.show()
age name
2   Alice
5   Bob
sort(*cols)

Return a new DataFrame sorted by the specified column(s).

Parameters:cols – The columns or expressions used for sorting
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
subtract(other)

Return a new DataFrame containing rows in this frame but not in another frame.

This is equivalent to EXCEPT in SQL.

take(num)

Take the first num rows of the RDD.

Each object in the list is a Row, the fields can be accessed as attributes.

>>> df.take(2)
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
toJSON(use_unicode=False)

Convert a DataFrame into a MappedRDD of JSON documents; one document per row.

>>> df.toJSON().first()
'{"age":2,"name":"Alice"}'
toPandas()

Collect all the rows and return a pandas.DataFrame.

>>> df.toPandas()  
   age   name
0    2  Alice
1    5    Bob
unionAll(other)

Return a new DataFrame containing union of rows in this frame and another frame.

This is equivalent to UNION ALL in SQL.

unpersist(blocking=True)

Mark it as non-persistent, and remove all blocks for it from memory and disk.

where(condition)

Filtering rows using the given condition, which could be Column expression or string of SQL expression.

where() is an alias for filter().

>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
withColumn(colName, col)

Return a new DataFrame by adding a column.

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
withColumnRenamed(existing, new)

Rename an existing column to a new name

>>> df.withColumnRenamed('age', 'age2').collect()
[Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
class pyspark.sql.GroupedData(jdf, sql_ctx)

A set of methods for aggregations on a DataFrame, created by DataFrame.groupBy().

agg(*exprs)

Compute aggregates by specifying a map from column name to aggregate methods.

The available aggregate methods are avg, max, min, sum, count.

Parameters:exprs – list or aggregate columns or a map from column name to aggregate methods.
>>> gdf = df.groupBy(df.name)
>>> gdf.agg({"*": "count"}).collect()
[Row(name=u'Bob', COUNT(1)=1), Row(name=u'Alice', COUNT(1)=1)]
>>> from pyspark.sql import functions as F
>>> gdf.agg(F.min(df.age)).collect()
[Row(MIN(age#0)=5), Row(MIN(age#0)=2)]
avg(*args)

Compute the average value for each numeric columns for each group.

>>> df.groupBy().avg('age').collect()
[Row(AVG(age#0)=3.5)]
>>> df3.groupBy().avg('age', 'height').collect()
[Row(AVG(age#4L)=3.5, AVG(height#5L)=82.5)]
count()

Count the number of rows for each group.

>>> df.groupBy(df.age).count().collect()
[Row(age=2, count=1), Row(age=5, count=1)]
max(*args)

Compute the max value for each numeric columns for each group.

>>> df.groupBy().max('age').collect()
[Row(MAX(age#0)=5)]
>>> df3.groupBy().max('age', 'height').collect()
[Row(MAX(age#4L)=5, MAX(height#5L)=85)]
mean(*args)

Compute the average value for each numeric columns for each group. This is an alias for avg.

>>> df.groupBy().mean('age').collect()
[Row(AVG(age#0)=3.5)]
>>> df3.groupBy().mean('age', 'height').collect()
[Row(AVG(age#4L)=3.5, AVG(height#5L)=82.5)]
min(*args)

Compute the min value for each numeric column for each group.

>>> df.groupBy().min('age').collect()
[Row(MIN(age#0)=2)]
>>> df3.groupBy().min('age', 'height').collect()
[Row(MIN(age#4L)=2, MIN(height#5L)=80)]
sum(*args)

Compute the sum for each numeric columns for each group.

>>> df.groupBy().sum('age').collect()
[Row(SUM(age#0)=7)]
>>> df3.groupBy().sum('age', 'height').collect()
[Row(SUM(age#4L)=7, SUM(height#5L)=165)]
class pyspark.sql.Column(jc)

A column in a DataFrame.

Column instances can be created by:

# 1. Select a column out of a DataFrame

df.colName
df["colName"]

# 2. Create from an expression
df.colName + 1
1 / df.colName
alias(alias)

Return a alias for this column

>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]
asc()

Returns a sort expression based on the ascending order of the given column name.

cast(dataType)

Convert the column into type dataType

>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
desc()

Returns a sort expression based on the descending order of the given column name.

endswith(other)

binary operator

getField(other)

An expression that gets a field by name in a StructField.

isNotNull()

True if the current expression is not null.

isNull()

True if the current expression is null.

like(other)

binary operator

rlike(other)

binary operator

startswith(other)

binary operator

substr(startPos, length)

Return a Column which is a substring of the column

Parameters:
  • startPos – start position (int or Column)
  • length – length of the substring (int or Column)
>>> df.select(df.name.substr(1, 3).alias("col")).collect()
[Row(col=u'Ali'), Row(col=u'Bob')]
class pyspark.sql.Row

A row in DataFrame. The fields in it can be accessed like attributes.

Row can be used to create a row object by using named arguments, the fields will be sorted by names.

>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row.name, row.age
('Alice', 11)

Row also can be used to create another Row like class, then it could be used to create Row objects, such as

>>> Person = Row("name", "age")
>>> Person
<Row(name, age)>
>>> Person("Alice", 11)
Row(name='Alice', age=11)
asDict()

Return as an dict

pyspark.sql.types module

class pyspark.sql.types.DataType[source]

Spark SQL DataType

json()[source]
jsonValue()[source]
simpleString()[source]
classmethod typeName()[source]
class pyspark.sql.types.NullType[source]

Spark SQL NullType

The data type representing None, used for the types which has not been inferred.

class pyspark.sql.types.StringType[source]

Spark SQL StringType

The data type representing string values.

class pyspark.sql.types.BinaryType[source]

Spark SQL BinaryType

The data type representing bytearray values.

class pyspark.sql.types.BooleanType[source]

Spark SQL BooleanType

The data type representing bool values.

class pyspark.sql.types.DateType[source]

Spark SQL DateType

The data type representing datetime.date values.

class pyspark.sql.types.TimestampType[source]

Spark SQL TimestampType

The data type representing datetime.datetime values.

class pyspark.sql.types.DecimalType(precision=None, scale=None)[source]

Spark SQL DecimalType

The data type representing decimal.Decimal values.

jsonValue()[source]
simpleString()[source]
class pyspark.sql.types.DoubleType[source]

Spark SQL DoubleType

The data type representing float values.

class pyspark.sql.types.FloatType[source]

Spark SQL FloatType

The data type representing single precision floating-point values.

class pyspark.sql.types.ByteType[source]

Spark SQL ByteType

The data type representing int values with 1 singed byte.

simpleString()[source]
class pyspark.sql.types.IntegerType[source]

Spark SQL IntegerType

The data type representing int values.

simpleString()[source]
class pyspark.sql.types.LongType[source]

Spark SQL LongType

The data type representing long values. If the any value is beyond the range of [-9223372036854775808, 9223372036854775807], please use DecimalType.

simpleString()[source]
class pyspark.sql.types.ShortType[source]

Spark SQL ShortType

The data type representing int values with 2 signed bytes.

simpleString()[source]
class pyspark.sql.types.ArrayType(elementType, containsNull=True)[source]

Spark SQL ArrayType

The data type representing list values. An ArrayType object comprises two fields, elementType (a DataType) and containsNull (a bool). The field of elementType is used to specify the type of array elements. The field of containsNull is used to specify if the array has None values.

classmethod fromJson(json)[source]
jsonValue()[source]
simpleString()[source]
class pyspark.sql.types.MapType(keyType, valueType, valueContainsNull=True)[source]

Spark SQL MapType

The data type representing dict values. A MapType object comprises three fields, keyType (a DataType), valueType (a DataType) and valueContainsNull (a bool).

The field of keyType is used to specify the type of keys in the map. The field of valueType is used to specify the type of values in the map. The field of valueContainsNull is used to specify if values of this map has None values.

For values of a MapType column, keys are not allowed to have None values.

classmethod fromJson(json)[source]
jsonValue()[source]
simpleString()[source]
class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)[source]

Spark SQL StructField

Represents a field in a StructType. A StructField object comprises three fields, name (a string), dataType (a DataType) and nullable (a bool). The field of name is the name of a StructField. The field of dataType specifies the data type of a StructField.

The field of nullable specifies if values of a StructField can contain None values.

classmethod fromJson(json)[source]
jsonValue()[source]
simpleString()[source]
class pyspark.sql.types.StructType(fields)[source]

Spark SQL StructType

The data type representing rows. A StructType object comprises a list of StructField.

classmethod fromJson(json)[source]
jsonValue()[source]
simpleString()[source]

pyspark.sql.functions module

A collections of builtin functions

pyspark.sql.functions.abs(col)

Computes the absolutle value.

pyspark.sql.functions.approxCountDistinct(col, rsd=None)[source]

Return a new Column for approximate distinct count of col

>>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
[Row(c=2)]
pyspark.sql.functions.asc(col)

Returns a sort expression based on the ascending order of the given column name.

pyspark.sql.functions.avg(col)

Aggregate function: returns the average of the values in a group.

pyspark.sql.functions.col(col)

Returns a Column based on the given column name.

pyspark.sql.functions.column(col)

Returns a Column based on the given column name.

pyspark.sql.functions.count(col)

Aggregate function: returns the number of items in a group.

pyspark.sql.functions.countDistinct(col, *cols)[source]

Return a new Column for distinct count of col or cols

>>> df.agg(countDistinct(df.age, df.name).alias('c')).collect()
[Row(c=2)]
>>> df.agg(countDistinct("age", "name").alias('c')).collect()
[Row(c=2)]
pyspark.sql.functions.desc(col)

Returns a sort expression based on the descending order of the given column name.

pyspark.sql.functions.first(col)

Aggregate function: returns the first value in a group.

pyspark.sql.functions.last(col)

Aggregate function: returns the last value in a group.

pyspark.sql.functions.lit(col)

Creates a Column of literal value.

pyspark.sql.functions.lower(col)

Converts a string expression to upper case.

pyspark.sql.functions.max(col)

Aggregate function: returns the maximum value of the expression in a group.

pyspark.sql.functions.mean(col)

Aggregate function: returns the average of the values in a group.

pyspark.sql.functions.min(col)

Aggregate function: returns the minimum value of the expression in a group.

pyspark.sql.functions.sqrt(col)

Computes the square root of the specified float value.

pyspark.sql.functions.sum(col)

Aggregate function: returns the sum of all values in the expression.

pyspark.sql.functions.sumDistinct(col)

Aggregate function: returns the sum of distinct values in the expression.

pyspark.sql.functions.udf(f, returnType=StringType)[source]

Create a user defined function (UDF)

>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df.select(slen(df.name).alias('slen')).collect()
[Row(slen=5), Row(slen=3)]
pyspark.sql.functions.upper(col)

Converts a string expression to upper case.

Table Of Contents

Previous topic

pyspark.mllib package

Next topic

pyspark.streaming module

This Page