pyspark.TaskContext#

class pyspark.TaskContext[source]#

Contextual information about a task which can be read or mutated during execution. To access the TaskContext for a running task, use: TaskContext.get().

New in version 2.2.0.

Examples

>>> from pyspark import TaskContext

Get a task context instance from RDD.

>>> spark.sparkContext.setLocalProperty("key1", "value")
>>> taskcontext = spark.sparkContext.parallelize([1]).map(lambda _: TaskContext.get()).first()
>>> isinstance(taskcontext.attemptNumber(), int)
True
>>> isinstance(taskcontext.partitionId(), int)
True
>>> isinstance(taskcontext.stageId(), int)
True
>>> isinstance(taskcontext.taskAttemptId(), int)
True
>>> taskcontext.getLocalProperty("key1")
'value'
>>> isinstance(taskcontext.cpus(), int)
True

Get a task context instance from a dataframe via Python UDF.

>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import udf
>>> @udf("STRUCT<anum: INT, partid: INT, stageid: INT, taskaid: INT, prop: STRING, cpus: INT>")
... def taskcontext_as_row():
...    taskcontext = TaskContext.get()
...    return Row(
...        anum=taskcontext.attemptNumber(),
...        partid=taskcontext.partitionId(),
...        stageid=taskcontext.stageId(),
...        taskaid=taskcontext.taskAttemptId(),
...        prop=taskcontext.getLocalProperty("key2"),
...        cpus=taskcontext.cpus())
...
>>> spark.sparkContext.setLocalProperty("key2", "value")
>>> [(anum, partid, stageid, taskaid, prop, cpus)] = (
...     spark.range(1).select(taskcontext_as_row()).first()
... )
>>> isinstance(anum, int)
True
>>> isinstance(partid, int)
True
>>> isinstance(stageid, int)
True
>>> isinstance(taskaid, int)
True
>>> prop
'value'
>>> isinstance(cpus, int)
True

Get a task context instance from a dataframe via Pandas UDF.

>>> import pandas as pd  
>>> from pyspark.sql.functions import pandas_udf
>>> @pandas_udf("STRUCT<"
...     "anum: INT, partid: INT, stageid: INT, taskaid: INT, prop: STRING, cpus: INT>")
... def taskcontext_as_row(_):
...    taskcontext = TaskContext.get()
...    return pd.DataFrame({
...        "anum": [taskcontext.attemptNumber()],
...        "partid": [taskcontext.partitionId()],
...        "stageid": [taskcontext.stageId()],
...        "taskaid": [taskcontext.taskAttemptId()],
...        "prop": [taskcontext.getLocalProperty("key3")],
...        "cpus": [taskcontext.cpus()]
...    })  
...
>>> spark.sparkContext.setLocalProperty("key3", "value")  
>>> [(anum, partid, stageid, taskaid, prop, cpus)] = (
...     spark.range(1).select(taskcontext_as_row("id")).first()
... )  
>>> isinstance(anum, int)
True
>>> isinstance(partid, int)
True
>>> isinstance(stageid, int)
True
>>> isinstance(taskaid, int)
True
>>> prop
'value'
>>> isinstance(cpus, int)
True

Methods

attemptNumber()

How many times this task has been attempted.

cpus()

CPUs allocated to the task.

get()

Return the currently active TaskContext.

getLocalProperty(key)

Get a local property set upstream in the driver, or None if it is missing.

partitionId()

The ID of the RDD partition that is computed by this task.

resources()

Resources allocated to the task.

stageId()

The ID of the stage that this task belong to.

taskAttemptId()

An ID that is unique to this task attempt (within the same SparkContext, no two task attempts will share the same attempt ID).