๐งช Schema and data contracts
dlt
will evolve the schema at the destination by following the structure and data types of the extracted data. There are several modes
that you can use to control this automatic schema evolution, from the default modes where all changes to the schema are accepted to
a frozen schema that does not change at all.
Consider this example:
@dlt.resource(schema_contract={"tables": "evolve", "columns": "freeze"})
def items():
...
This resource will allow new tables (both nested tables and tables with dynamic names) to be created, but will throw an exception if data is extracted for an existing table which contains a new column.
Setting up the contractโ
You can control the following schema entities:
tables
- the contract is applied when a new table is createdcolumns
- the contract is applied when a new column is created on an existing tabledata_type
- the contract is applied when data cannot be coerced into a data type associated with an existing column.
You can use contract modes to tell dlt
how to apply the contract for a particular entity:
evolve
: No constraints on schema changes.freeze
: This will raise an exception if data is encountered that does not fit the existing schema, so no data will be loaded to the destination.discard_row
: This will discard any extracted row if it does not adhere to the existing schema, and this row will not be loaded to the destination.discard_value
: This will discard data in an extracted row that does not adhere to the existing schema, and the row will be loaded without this data.
The default mode (evolve) works as follows:
- New tables may always be created.
- New columns may always be appended to the existing table.
- Data that do not coerce to the existing data type of a particular column will be sent to a variant column created for this particular type.
Passing the schema_contract argumentโ
The schema_contract
exists on the dlt.source decorator as a default for all resources in that source and on the
dlt.resource decorator as a directive for the individual resource - and as a consequence - on all tables created by this resource.
Additionally, it exists on the pipeline.run()
method, which will override all existing settings.
The schema_contract
argument accepts two forms:
- full: a mapping of schema entities to contract modes
- shorthand: a contract mode (string) that will be applied to all schema entities.
For example, setting schema_contract
to freeze will expand to the full form:
{"tables": "freeze", "columns": "freeze", "data_type": "freeze"}
You can change the contract on the source instance via the schema_contract
property. For resource, you can use apply_hints.
Nuances of contract modesโ
- Contracts are applied after names of tables and columns are normalized.
- A contract defined on a resource is applied to all root tables and nested tables created by that resource.
discard_row
works on the table level. For example, if you have two tables in a nested relationship, i.e., users and users__addresses, and the contract is violated in the users__addresses table, the row of that table is discarded while the parent row in the users table will be loaded.
Use Pydantic models for data validationโ
Pydantic models can be used to define table schemas and validate incoming data. You can use any model you already have. dlt
will internally synthesize (if necessary) new models that conform to the schema contract on the resource.
Just passing a model in the column
argument of the dlt.resource sets a schema contract that conforms to the default Pydantic behavior:
{
"tables": "evolve",
"columns": "discard_value",
"data_type": "freeze"
}
New tables are allowed, extra fields are ignored, and invalid data raises an exception.
If you pass a schema contract explicitly, the following happens to schema entities:
- tables do not impact the Pydantic models.
- columns modes are mapped into the extra modes of Pydantic (see below).
dlt
will apply this setting recursively if models contain other models. - data_type supports the following modes for Pydantic: evolve will synthesize a lenient model that allows for any data type. This may result in variant columns upstream.
freeze will re-raise
ValidationException
. discard_row will remove the non-validating data items. discard_value is not currently supported. We may eventually do that in Pydantic v2.
dlt
maps column contract modes into the extra fields settings as follows.
Note that this works in two directions. If you use a model with such a setting explicitly configured, dlt
sets the column contract mode accordingly. This also avoids synthesizing modified models.
column mode | pydantic extra |
---|---|
evolve | allow |
freeze | forbid |
discard_value | ignore |
discard_row | forbid |
discard_row
requires additional handling when a ValidationError is raised.
Model validation is added as a transform step to the resource. This step will convert the incoming data items into instances of validating models. You could easily convert them back to dictionaries by using add_map(lambda item: item.dict())
on a resource.
Pydantic models work on the extracted data before names are normalized or nested tables are created. Make sure to name model fields as in your input data and handle nested data with nested models.
As a consequence, discard_row
will drop the whole data item - even if a nested model was affected.
Set contracts on Arrow tables and Pandasโ
All contract settings apply to arrow tables and panda frames as well.
- tables mode is the same - no matter what the data item type is.
- columns will allow new columns, raise an exception, or modify tables/frames still in the extract step to avoid rewriting Parquet files.
- data_type changes to data types in tables/frames are not allowed and will result in a data type schema clash. We could allow for more modes (evolving data types in Arrow tables sounds weird but ping us on Slack if you need it.)
Here's how dlt
deals with column modes:
- evolve new columns are allowed (the table may be reordered to put them at the end).
- discard_value the column will be deleted.
- discard_row rows with the column present will be deleted and then the column will be deleted.
- freeze an exception on a new column.
Get context from DataValidationError in freeze modeโ
When a contract is violated in freeze mode, dlt
raises a DataValidationError
exception. This exception provides access to the full context and passes the evidence to the caller.
As with any other exception coming from a pipeline run, it will be re-raised via a PipelineStepFailed
exception, which you should catch in an except block:
try:
pipeline.run()
except PipelineStepFailed as pip_ex:
if pip_ex.step == "normalize":
if isinstance(pip_ex.__context__.__context__, DataValidationError):
...
if pip_ex.step == "extract":
if isinstance(pip_ex.__context__, DataValidationError):
...
DataValidationError
provides the following context:
schema_name
,table_name
, andcolumn_name
provide the logical "location" at which the contract was violated.schema_entity
andcontract_mode
indicate which contract was violated.table_schema
contains the schema against which the contract was validated. It may be a Pydantic model or a dltTTableSchema
instance.schema_contract
is the full, expanded schema contract.data_item
is the causing data item (Python dict, arrow table, Pydantic model, or list thereof).
Contracts on new tablesโ
If a table is a new table that has not been created on the destination yet, dlt will allow the creation of new columns. For a single pipeline run, the column mode is changed (internally) to evolve and then reverted back to the original mode. This allows for initial schema inference to happen, and then on subsequent runs, the inferred contract will be applied to the new data.
The following tables are considered new:
- Child tables inferred from nested data.
- Dynamic tables created from the data during extraction.
- Tables containing incomplete columns - columns without a data type bound to them.
For example, such a table is considered new because the column number is incomplete (defined as primary key and NOT null but no data type):
blocks:
description: Ethereum blocks
write_disposition: append
columns:
number:
nullable: false
primary_key: true
name: number
Tables that are not considered new:
- Those with columns defined by Pydantic models.
Working with datasets that have manually added tables and columns on the first loadโ
In some cases, you might be working with datasets that have tables or columns created outside of dlt. If you are loading to a table not created by dlt for the first time, dlt will not know about this table while enforcing schema contracts. This means that if you do a load where the tables
are set to evolve
, all will work as planned. If you have tables
set to freeze
, dlt will raise an exception because it thinks you are creating a new table (which you are from dlt's perspective). You can allow evolve
for one load and then switch back to freeze
.
The same thing will happen if dlt
knows your table, but you have manually added a column to your destination and you have columns
set to freeze
.
Code examplesโ
The below code will silently ignore new subtables, allow new columns to be added to existing tables, and raise an error if a variant of a column is discovered.
@dlt.resource(schema_contract={"tables": "discard_row", "columns": "evolve", "data_type": "freeze"})
def items():
...
The below code will raise an error on any encountered schema change. Note: You can always set a string which will be interpreted as though all keys are set to these values.
pipeline.run(my_source(), schema_contract="freeze")
The below code defines some settings on the source which can be overwritten on the resource, which in turn can be overwritten by the global override on the run
method.
Here, for all resources, variant columns are frozen and raise an error if encountered. On items
, new columns are allowed, but other_items
inherits the freeze
setting from
the source, thus new columns are frozen there. New tables are allowed.
@dlt.resource(schema_contract={"columns": "evolve"})
def items():
...
@dlt.resource()
def other_items():
...
@dlt.source(schema_contract={"columns": "freeze", "data_type": "freeze"})
def source():
return [items(), other_items()]
# this will use the settings defined by the decorators
pipeline.run(source())
# this will freeze the whole schema, regardless of the decorator settings
pipeline.run(source(), schema_contract="freeze")