Iceberg
Important Capabilities
Capability | Status | Notes |
---|---|---|
Data Profiling | ✅ | Optionally enabled via configuration. |
Descriptions | ✅ | Enabled by default. |
Detect Deleted Entities | ✅ | Enabled via stateful ingestion |
Domains | ❌ | Currently not supported. |
Extract Ownership | ✅ | Automatically ingests ownership information from table properties based on user_ownership_property and group_ownership_property |
Partition Support | ❌ | Currently not supported. |
Platform Instance | ✅ | Optionally enabled via configuration, an Iceberg instance represents the catalog name where the table is stored. |
Integration Details
The DataHub Iceberg source plugin extracts metadata from Iceberg tables stored in a distributed or local file system. Typically, Iceberg tables are stored in a distributed file system like S3 or Azure Data Lake Storage (ADLS) and registered in a catalog. There are various catalog implementations like Filesystem-based, RDBMS-based or even REST-based catalogs. This Iceberg source plugin relies on the pyiceberg library.
CLI based Ingestion
Install the Plugin
The iceberg
source works out of the box with acryl-datahub
.
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "iceberg"
config:
env: PROD
catalog:
# REST catalog configuration example using S3 storage
my_rest_catalog:
type: rest
# Catalog configuration follows pyiceberg's documentation (https://py.iceberg.apache.org/configuration)
uri: http://localhost:8181
s3.access-key-id: admin
s3.secret-access-key: password
s3.region: us-east-1
warehouse: s3a://warehouse/wh/
s3.endpoint: http://localhost:9000
# SQL catalog configuration example using Azure datalake storage and a PostgreSQL database
# my_sql_catalog:
# type: sql
# uri: postgresql+psycopg2://user:password@sqldatabase.postgres.database.azure.com:5432/icebergcatalog
# adlfs.tenant-id: <Azure tenant ID>
# adlfs.account-name: <Azure storage account name>
# adlfs.client-id: <Azure Client/Application ID>
# adlfs.client-secret: <Azure Client Secret>
platform_instance: my_rest_catalog
table_pattern:
allow:
- marketing.*
profiling:
enabled: true
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
catalog ✅ map(str,object) | |
group_ownership_property string | Iceberg table property to look for a CorpGroup owner. Can only hold a single group value. If property has no value, no owner information will be emitted. |
ignore_table_errors boolean | Set to true to not produce errors when tables have errors. Default: False |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details. |
processing_threads integer | How many threads will be processing tables Default: 1 |
user_ownership_property string | Iceberg table property to look for a CorpUser owner. Can only hold a single user value. If property has no value, no owner information will be emitted. Default: owner |
env string | The environment that all assets produced by this connector belong to Default: PROD |
table_pattern AllowDenyPattern | Regex patterns for tables to filter in ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
table_pattern.allow.string string | |
table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
table_pattern.deny.string string | |
profiling IcebergProfilingConfig | Default: {'enabled': False, 'include_field_null_count': Tru... |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.operation_config OperationConfig | Experimental feature. To specify operation configs. |
profiling.operation_config.lower_freq_profile_enabled boolean | Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling. Default: False |
profiling.operation_config.profile_date_of_month integer | Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect. |
profiling.operation_config.profile_day_of_week integer | Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect. |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Iceberg Stateful Ingestion Config. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "IcebergSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Iceberg Stateful Ingestion Config.",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"catalog": {
"title": "Catalog",
"description": "Catalog configuration where to find Iceberg tables. Only one catalog specification is supported. The format is the same as [pyiceberg's catalog configuration](https://py.iceberg.apache.org/configuration/), where the catalog name is specified as the object name and attributes are set as key-value pairs.",
"type": "object",
"additionalProperties": {
"type": "object"
}
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"user_ownership_property": {
"title": "User Ownership Property",
"description": "Iceberg table property to look for a `CorpUser` owner. Can only hold a single user value. If property has no value, no owner information will be emitted.",
"default": "owner",
"type": "string"
},
"group_ownership_property": {
"title": "Group Ownership Property",
"description": "Iceberg table property to look for a `CorpGroup` owner. Can only hold a single group value. If property has no value, no owner information will be emitted.",
"type": "string"
},
"ignore_table_errors": {
"title": "Ignore Table Errors",
"description": "Set to true to not produce errors when tables have errors.",
"default": false,
"type": "boolean"
},
"profiling": {
"title": "Profiling",
"default": {
"enabled": false,
"include_field_null_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"operation_config": {
"lower_freq_profile_enabled": false,
"profile_day_of_week": null,
"profile_date_of_month": null
}
},
"allOf": [
{
"$ref": "#/definitions/IcebergProfilingConfig"
}
]
},
"processing_threads": {
"title": "Processing Threads",
"description": "How many threads will be processing tables",
"default": 1,
"type": "integer"
}
},
"required": [
"catalog"
],
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
"default": {},
"type": "object"
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"OperationConfig": {
"title": "OperationConfig",
"type": "object",
"properties": {
"lower_freq_profile_enabled": {
"title": "Lower Freq Profile Enabled",
"description": "Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
"default": false,
"type": "boolean"
},
"profile_day_of_week": {
"title": "Profile Day Of Week",
"description": "Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
},
"profile_date_of_month": {
"title": "Profile Date Of Month",
"description": "Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
}
},
"additionalProperties": false
},
"IcebergProfilingConfig": {
"title": "IcebergProfilingConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"operation_config": {
"title": "Operation Config",
"description": "Experimental feature. To specify operation configs.",
"allOf": [
{
"$ref": "#/definitions/OperationConfig"
}
]
}
},
"additionalProperties": false
}
}
}
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
Source Concept | DataHub Concept | Notes |
---|---|---|
iceberg | Data Platform | |
Table | Dataset | An Iceberg table is registered inside a catalog using a name, where the catalog is responsible for creating, dropping and renaming tables. Catalogs manage a collection of tables that are usually grouped into namespaces. The name of a table is mapped to a Dataset name. If a Platform Instance is configured, it will be used as a prefix: <platform_instance>.my.namespace.table . |
Table property | User (a.k.a CorpUser) | The value of a table property can be used as the name of a CorpUser owner. This table property name can be configured with the source option user_ownership_property . |
Table property | CorpGroup | The value of a table property can be used as the name of a CorpGroup owner. This table property name can be configured with the source option group_ownership_property . |
Table parent folders (excluding warehouse catalog location) | Container | Available in a future release |
Table schema | SchemaField | Maps to the fields defined within the Iceberg table schema definition. |
Troubleshooting
Exceptions while increasing processing_threads
Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience
exceptions appearing when increasing processing_threads
configuration parameter, try to increase limit of open
files (i.e. using ulimit
in Linux).
Code Coordinates
- Class Name:
datahub.ingestion.source.iceberg.iceberg.IcebergSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Iceberg, feel free to ping us on our Slack.