我有一个简单的Delta Live Tables管道,它将多个csv文件从cloudFiles(s3存储)流式读取到发布到蜂巢元存储的Delta表中。

我有两个要求使我的情况更加复杂/独特:

  1. 由于csv文件的格式,我需要将skipRows参数用于autoLoader。这需要使用Databricks运行时的预览通道(编写时为v11.3)。source
  2. 我需要将表columnMapping.mode属性设置为name,因为csv数据的列名中包含Delta/Parquet本机不允许的字符。来源

以上两个功能似乎都是预览/测试功能,所以我观察到的行为可能是一个bug。

我的管道定义如下:

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

s3_url = "s3://<path_to_csvs>"

@dlt.table(
    comment="...",
    table_properties={
        'delta.minReaderVersion' : '2', 
        'delta.minWriterVersion' : '5', 
        'delta.columnMapping.mode' : 'name',
        'quality': 'bronze'
    }
)
def bronze_my_csv_data_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("skipRows", 1)
      .option("header", "true")
      .option("cloudFiles.includeExistingFiles", "true")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
      .option("pathGlobFilter", "*.csv")
      .load(s3_url)
  )

当第一次设置并运行管道时,这会按预期工作,但在进行更改并运行管道的“完全刷新所有”(以刷新所有数据)时,我会收到以下错误:

com.databricks.sql.transaction.tahoe.DeltaColumnMappingUnsupportedException: 
Schema change is detected:

old schema:
root


new schema:
root
 |-- TIMESTAMP: string (nullable = true)
 |-- RECORD: string (nullable = true)
 |-- Samples_Max: string (nullable = true)
 ...

Schema changes are not allowed during the change of column mapping mode.

即使更改目标表名以创建新的空表,也会发生这种情况。一旦发生,即使在常规(非完全刷新)运行中,也会发生相同的错误。

如有任何帮助,将不胜感激

Delta Live表完全刷新时不允许架构更改的更多相关文章

  1. pyspark自定义UDAF函数调用报错问题解决

    这篇文章主要为大家介绍了pyspark自定义UDAF函数调用报错问题解决,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  2. 尝试在同一pyspark结构化流作业中写入和读取增量表。可以&#39;看不到数据

    PySpark作业是否可以在增量表中写入,并在同一代码中从该表中读取?使用以下命令运行代码时不会出错。我正在尝试将我要刷新到Kafka的数据可视化到Delta表中,以确保数据流动良好,底层组件也工作良好。即使向我的主题发送了流量,我也可以看到一个空表。任何形式的帮助都会有帮助。

  3. regexp_extract返回不期望的结果

    测试代码:返回:正如你从上面看到的,标题中的2个给出了错误的结果。1013BriarLane是正确的,因为它没有返回任何内容,因为1013不是年份这是我的代码:在标题子字符串中获取正确的年份

  4. Delta Live表完全刷新时不允许架构更改

    我有一个简单的DeltaLiveTables管道,它将多个csv文件从cloudFiles流式读取到发布到蜂巢元存储的Delta表中。我有两个要求使我的情况更加复杂/独特:由于csv文件的格式,我需要将skipRows参数用于autoLoader。这需要使用Databricks运行时的预览通道。source我需要将表columnMapping.mode属性设置为name,因为csv数据的列名中包含Delta/Parquet本机不允许的字符。一旦发生,即使在常规运行中,也会发生相同的错误。如有任何帮助,将不

  5. 在Spark Web UI中检查FAIR调度程序的池统计信息的位置

    我看到我的Spark应用程序正在使用FAIR调度程序:但我无法确认它是否使用了我设置的两个池。下面是我在PySpark中实现的线程函数我以为“阶段”菜单应该显示游泳池信息,但我没有看到。这是否意味着游泳池设置不正确,还是我看错了地方?我在EMR6.9.0之上使用PySpark3.3.0

  6. HashPartitioning数据帧以在PySpark中的连接期间实现联合分区

    为了实现#2,我应该尝试通过对两个数据帧的键进行共同分区,尽可能减少混洗。为了改进我的加入,我首先对event_type进行过滤,以缩小两个数据帧上的数据范围。然后我在day和event_id上执行实际的连接。我读到repartition在指定的列上实现了哈希分区。我将数据帧保存到磁盘上,还包括一个partitionBy,以便在过滤/分组操作上获得更好的性能。我甚至不确定同时使用repartition和partitionBy是正确的方法。当我从磁盘重新读取拼花地板文件时,使用repartition()的初

  7. Pyspark:异常:Java网关进程在发送驱动程序的端口号之前退出

    但问题从未得到解决.请帮忙!

  8. 当将JSON文件读入Spark时,python – _corrupt_record错误

    我有这个JSON文件这是使用Pythonjson.dump方法获得的.现在,我想使用pyspark将此文件读入Spark中的DataFrame.以下文档,我在这样做sc=SparkContext()sqlc=sqlContextdf=sqlc.read.jsonprintdf.show()打印声明:任何人都知道发生了什么,为什么不正确地解释文件?

随机推荐

  1. 如何扩展ATmega324PB微控制器的以下宏寄存器?

    我目前正在学习嵌入式,我有以下练习:展开以下宏寄存器:如果有人解决了这个问题,我将不胜感激,以便将来参考

  2. Python将ONNX运行时设置为返回张量而不是numpy数组

    在python中,我正在加载预定义的模型:然后我加载一些数据并运行它:到目前为止,它仍在正常工作,但我希望它默认返回Tensor列表,而不是numpy数组。我对ONNX和PyTorch都是新手,我觉得这是我在这里缺少的基本内容。这将使转换中的一些开销相同。

  3. 在macOS上的终端中使用Shell查找文件中的单词

    我有一个文本文件,其中有一行:我需要找到ID并将其提取到变量中。我想出了一个RexEx模式:但它似乎对我尝试过的任何东西都不起作用:grep、sed——不管怎样。我的一个尝试是:我为这样一个看似愚蠢的问题感到抱歉,但我在互联网上找不到任何东西:我在SO和SE上读了几十个类似的问题,并在谷歌上搜索了几个教程,但仍然无法找到答案。欢迎提供任何指导!

  4. react-chartjs-2甜甜圈图中只有标题未更新

    我正在使用react-chartjs-2在我的网站中实现甜甜圈图。下面是我用来呈现图表的代码。我将甜甜圈图的详细信息从父组件传递到子组件,所有道具都正确传递。当我在beforeDraw函数外部记录props.title时,它会记录正确的值,但当我在beforeDraw函数内部记录props.title时,它将记录标题的前一个值,从而呈现标题的前值。我在这里做错了什么?

  5. 如何在tkinter中使用Python生成器函数?

    生成器函数承诺使某些代码更易于编写。但我并不总是知道如何使用它们。假设我有一个斐波那契生成器函数fib(),我想要一个显示第一个结果的tkinter应用程序。当我点击“下一步”按钮时,它会显示第二个数字,依此类推。我如何构建应用程序来实现这一点?我可能需要在线程中运行生成器。但如何将其连接回GUI?

  6. 如何为每次提交将存储库历史记录拆分为一行?

    我正在尝试获取存储库的历史记录,但结果仅以单行文本的形式返回给我。

  7. 尝试在颤振项目上初始化Firebase时出错

    当尝试在我的颤振项目上初始化firebase时,我收到了这个错误有人知道我能做什么吗?应用程序分级Gradle插件Gradle项目颤振相关性我已经将firebase设置为Google文档已经在另一个模拟器上尝试过,已经尝试过创建一个全新的模拟器,已经在不同的设备上尝试过了,已经尝试了特定版本的firebase,已经尝试添加但没有任何效果,已经在youtube上看到了关于它的每一个视频,该应用程序在android和iOS两个平台上都抛出了这个错误

  8. 在unix中基于当前日期添加新列

    我试图在unix中基于时间戳列在最后一个单元格中添加一个状态列。我不确定如何继续。

  9. 麦克斯·蒙特利。我一直得到UncaughtReferenceError:当我在终端中写入node-v时,节点未定义

    如果这是您应该知道的,请确认:我已将所有shell更改为默认为zsh。当我在终端中写入node-v时,我一直收到“UncaughtReferenceError:nodeisnotdefined”。但它显示节点已安装。我是个新手,在这方面经验不足。

  10. 如何在前端单击按钮时调用后端中的函数?

    那么如何在后端添加一个新的端点,点击按钮调用这个函数。

返回
顶部