Skip to main content


_StepInfo Objects

class _StepInfo(NamedTuple)



ids of the loaded packages


Information on loaded packages

StepMetrics Objects

class StepMetrics(TypedDict)


Metrics for particular package processed in particular pipeline step


Start of package processing


End of package processing

StepInfo Objects

class StepInfo(SupportsHumanize, Generic[TStepMetricsCo])



Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list


ids of the loaded packages


Information on loaded packages


def started_at() -> datetime.datetime


Returns the earliest start date of all collected metrics


def finished_at() -> datetime.datetime


Returns the latest end date of all collected metrics

ExtractMetrics Objects

class ExtractMetrics(StepMetrics)



Metrics collected per job id during writing of job file


Job metrics aggregated by table


Job metrics aggregated by resource


A resource dag where elements of the list are graph edges


Hints passed to the resources

_ExtractInfo Objects

class _ExtractInfo(NamedTuple)


NamedTuple cannot be part of the derivation chain so we must re-declare all fields to use it as mixin later


ids of the loaded packages


Information on loaded packages

ExtractInfo Objects

class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo)


A tuple holding information on extracted data items. Returned by pipeline extract method.


def asdict() -> DictStrAny


A dictionary representation of ExtractInfo that can be loaded with dlt

NormalizeMetrics Objects

class NormalizeMetrics(StepMetrics)



Metrics collected per job id during writing of job file


Job metrics aggregated by table

_NormalizeInfo Objects

class _NormalizeInfo(NamedTuple)



ids of the loaded packages


Information on loaded packages

NormalizeInfo Objects

class NormalizeInfo(StepInfo[NormalizeMetrics], _NormalizeInfo)


A tuple holding information on normalized data items. Returned by pipeline normalize method.


def asdict() -> DictStrAny


A dictionary representation of NormalizeInfo that can be loaded with dlt

_LoadInfo Objects

class _LoadInfo(NamedTuple)



ids of the loaded packages


Information on loaded packages

LoadInfo Objects

class LoadInfo(StepInfo[LoadMetrics], _LoadInfo)


A tuple holding the information on recently loaded packages. Returned by pipeline run and load methods


def asdict() -> DictStrAny


A dictionary representation of LoadInfo that can be loaded with dlt


def has_failed_jobs() -> bool


Returns True if any of the load packages has a failed job.


def raise_on_failed_jobs() -> None


Raises DestinationHasFailedJobs exception if any of the load packages has a failed job.

WithStepInfo Objects

class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo])


Implemented by classes that generate StepInfo with metrics and package infos


def current_load_id() -> str


Returns currently processing load id


def get_step_info(pipeline: "SupportsPipeline") -> TStepInfo


Returns and instance of StepInfo with metrics and package infos

TPipelineLocalState Objects

class TPipelineLocalState(TypedDict)



Indicates a first run of the pipeline, where run ends with successful loading of data

TPipelineState Objects

class TPipelineState(TypedDict)


Schema for a pipeline state that is stored within the pipeline working directory


Name of the first schema added to the pipeline to which all the resources without schemas will be added


All the schemas present within the pipeline working directory

TSourceState Objects

class TSourceState(TPipelineState)



type: ignore[misc]

SupportsPipeline Objects

class SupportsPipeline(Protocol)


A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties


Name of the pipeline


Name of the default schema


The destination reference which is ModuleType. destination.__name__ returns the name string


Name of the dataset to which pipeline will be loaded to


A configuration of runtime options like logging level and format and various tracing options


A working directory of the pipeline


A configurable pipeline secret to be used as a salt or a seed for encryption key


Indicates a first run of the pipeline, where run ends with successful loading of the data


def state() -> TPipelineState


Returns dictionary with pipeline state


def schemas() -> Mapping[str, Schema]


Mapping of all pipeline schemas


def set_local_state_val(key: str, value: Any) -> None


Sets value in local state. Local state is not synchronized with destination.


def get_local_state_val(key: str) -> Any


Gets value from local state. Local state is not synchronized with destination.

PipelineContext Objects

class PipelineContext(ContainerInjectableContext)



def pipeline() -> SupportsPipeline


Creates or returns exiting pipeline


def __init__(
deferred_pipeline: Callable[..., SupportsPipeline] = None) -> None


Initialize the context with a function returning the Pipeline object to allow creation on first use


def pipeline_state(
container: Container,
initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]


Gets value of the state from context or active pipeline, if none found returns initial_default

Injected state is called "writable": it is injected by the Pipeline class and all the changes will be persisted. The state coming from pipeline context or initial_default is called "read only" and all the changes to it will be discarded

Returns tuple (state, writable)


def source_state() -> DictStrAny


Returns a dictionary with the source-scoped state. Source-scoped state may be shared across the resources of a particular source. Please avoid using source scoped state. Check the resource_state function for resource-scoped state that is visible within particular resource. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.


The source state is a python dictionary-like object that is available within the @dlt.source and @dlt.resource decorated functions and may be read and written to. The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. When using the state:

  • The source state is scoped to a particular source and will be stored under the source name in the pipeline state
  • It is possible to share state across many sources if they share a schema with the same name
  • Any JSON-serializable values can be written and the read from the state. dlt dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
  • The state available in the source decorated function is read only and any changes will be discarded.
  • The state available in the resource decorated function is writable and written values will be available on the next pipeline run


def resource_state(resource_name: str = None,
source_state_: Optional[DictStrAny] = None) -> DictStrAny


Returns a dictionary with the resource-scoped state. Resource-scoped state is visible only to resource requesting the access. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.

Note that this function accepts the resource name as optional argument. There are rare cases when dlt is not able to resolve resource name due to requesting function working in different thread than the main. You'll need to pass the name explicitly when you request resource_state from async functions or functions decorated with @defer.

Summary: The resource state is a python dictionary-like object that is available within the @dlt.resource decorated functions and may be read and written to. The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. When using the state:

  • The resource state is scoped to a particular resource requesting it.
  • Any JSON-serializable values can be written and the read from the state. dlt dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
  • The state available in the resource decorated function is writable and written values will be available on the next pipeline run


The most typical use case for the state is to implement incremental load.

def players_games(chess_url, players, start_month=None, end_month=None):
checked_archives = dlt.current.resource_state().setdefault("archives", [])
archives = players_archives(chess_url, players)
for url in archives:
if url in checked_archives:
print(f"skipping archive {url}")
print(f"getting archive {url}")
# get the filtered archive
r = requests.get(url)
yield r.json().get("games", [])

Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). Up to few thousand archives we should be good though.


  • resource_name str, optional - forces to use state for a resource with this name. Defaults to None.
  • source_state_ Optional[DictStrAny], optional - Alternative source state. Defaults to None.


  • ResourceNameNotAvailable - Raise if used outside of resource context or from a different thread than main


  • DictStrAny - State dictionary


def reset_resource_state(resource_name: str,
source_state_: Optional[DictStrAny] = None) -> None


Resets the resource state with name resource_name by removing it from source_state


  • resource_name - The resource key to reset
  • state - Optional source state dictionary to operate on. Use when working outside source context.


def get_dlt_pipelines_dir() -> str


Gets default directory where pipelines' data will be stored

  1. in user home directory ~/.dlt/pipelines/
  2. if current user is root in /var/dlt/pipelines
  3. if current user does not have a home directory in /tmp/dlt/pipelines


def get_dlt_repos_dir() -> str


Gets default directory where command repositories will be stored

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!


Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.