我有一个简单的Delta Live Tables管道,它将多个csv文件从cloudFiles(s3存储)流式读取到发布到蜂巢元存储的Delta表中。
我有两个要求使我的情况更加复杂/独特:
- 由于csv文件的格式,我需要将
skipRows
参数用于autoLoader。这需要使用Databricks运行时的预览通道(编写时为v11.3)。source - 我需要将表
columnMapping.mode
属性设置为name
,因为csv数据的列名中包含Delta/Parquet本机不允许的字符。来源
以上两个功能似乎都是预览/测试功能,所以我观察到的行为可能是一个bug。
我的管道定义如下:
import dlt from pyspark.sql.functions import * from pyspark.sql.types import * s3_url = "s3://<path_to_csvs>" @dlt.table( comment="...", table_properties={ 'delta.minReaderVersion' : '2', 'delta.minWriterVersion' : '5', 'delta.columnMapping.mode' : 'name', 'quality': 'bronze' } ) def bronze_my_csv_data_raw(): return ( spark.readStream.format("cloudFiles") .option("skipRows", 1) .option("header", "true") .option("cloudFiles.includeExistingFiles", "true") .option("cloudFiles.format", "csv") .option("cloudFiles.schemaEvolutionMode", "addNewColumns") .option("pathGlobFilter", "*.csv") .load(s3_url) )
当第一次设置并运行管道时,这会按预期工作,但在进行更改并运行管道的“完全刷新所有”(以刷新所有数据)时,我会收到以下错误:
com.databricks.sql.transaction.tahoe.DeltaColumnMappingUnsupportedException: Schema change is detected: old schema: root new schema: root |-- TIMESTAMP: string (nullable = true) |-- RECORD: string (nullable = true) |-- Samples_Max: string (nullable = true) ... Schema changes are not allowed during the change of column mapping mode.
即使更改目标表名以创建新的空表,也会发生这种情况。一旦发生,即使在常规(非完全刷新)运行中,也会发生相同的错误。
如有任何帮助,将不胜感激