以下是“向现有 dataframe 添加哈希值”的任务如何从花费几天时间到消耗几乎整个冲刺的过程。
2022 年第二季度,我开始开发一个数据管道,该管道从 rest 服务获取市场数据并将其存储在 bigquery 表中。这是管道的高级解释。有趣的部分是如何查询数据,将其转换为 dataframe,然后使用 airflow 的gc++stobigqueryoperator上传 bigquery 表。
起初,写起来似乎很简单,但 airflow 的“幂等”原理给它增加了一些挑战。从该 rest 服务获取的内容由另一个表决定,即使 job 是幂等的,它用作参考的表也可能在两次运行之间发生变化。经过额外的时间,与数据工程师的沟通管道已于 2022 年第三季度末准备就绪。
快进到 2024 年第一季度。此时,我们有更多的用户访问数据,并且我们意识到我们的查询模式没有正确使用分区。或者更确切地说,我们想要基于字符串列访问数据,但无法在 bigquery 中对字符串列进行分区。这导致扫描大量数据并经常达到每日配额。
这促使我们考虑如何根据字符串列对数据进行分区。我们的数据工程师建议使用 farmhash 并附加模运算将该字符串列转换为整数。在概念验证中,这减少了近 90% 的扫描,查询性能提高了 3-5 倍。我们决定继续将此作为最终解决方案。我们所需要的只是:
- 创建带有farmhash指纹的表
- 更改管道来计算指纹
- 上传数据。
为了在Python中计算farmhash指纹,有一个pyfarmhash模块。我安装了该模块并使用下面的代码来计算哈希值,并且在本地一切都按预期工作。
1 2 3 4 |
|
随着所有测试的通过,现在是时候将代码推送到 airflow 并运行它了。我没想到在这个阶段会出现任何问题。事实上,我很高兴一切都按计划并在预计的时间内进行。
带着愉快的心情和充满信心,我推动了我的改变,开始了工作,然后等待了10-15分钟让它完成。与此同时,我切换到了另一项任务。很快,我收到了一封来自 airflow 的意外失败电子邮件。我查看了日志,惊讶地发现安装 pyfarmhash 模块时失败了!
为了帮助您理解问题,我需要解释一下作业的结构。该工作有以下步骤:
- 下载 parquet 格式的数据
- 上传到gcs存储桶
- 删除现有数据;如果有的话。 (避免重复数据)
- 将数据上传到bq表。
在此过程中,下载数据的task-1是一个单独的python模块。为了运行它,我使用了 airflow 中的pythonvirtua.envoperator。该操作符允许您根据需要指定包,然后将它们安装在新创建的虚拟环境中。安装包后,它的所有依赖项也会安装,您就可以开始使用了。
我添加了pyfarmhash作为下载数据的模块的依赖项,其他一切保持不变。但它失败了!为什么?
pyfarmhash是一个用c/c++实现的哈希库。安装后,它需要 gcc 来编译软件包,而 airflow 主机上不存在该软件包。在 airflow 主机上不安装 gcc 是有道理的,但不幸的是,这对我来说是一个障碍。
我寻找了 pyfarmhash 包的纯 python 实现,但没有。然后,我寻找车轮包装,但同样没有。我考虑过构建轮子包并推动它们,但这将导致在内部提供轮子包的长期责任。我想避免额外的、类似于解决方法的步骤。我探索了所有选项,并与维护 airflow 的团队讨论了它们。他们建议创建一个 docker 镜像并在 kubernetespodoperator 中运行它。这是一个不错的选择,因为我可以控制环境并包含所需的任何内容,而无需依赖外部环境。此外,该解决方案没有解决方法。唯一的短期缺点是需要更多时间来实施。
在开始使用基于 docker 的解决方案之前,我已经在这项任务上花费了大约 16-20 个小时。对于基于 docker 的解决方案,我还需要:
- 更改 python 包以具有开始下载和清除逻辑的入口点。
- 创建一个 docker 包并测试它(这是我的第二个 docker 镜像)。
由于我不再在 airflow 中使用 pythonvirtualenvoperator,我决定完全删除它并改进工作流程。我必须更改 python 包才能有入口点来开始下载和清除逻辑
我又花了 30-36 个小时才准备好了 docker 镜像的最终解决方案,这需要 6-7 个工作日,加上最初的 2 天,它变成了一项长期的冲刺任务。
我回想起来,想知道,我不得不放弃工作解决方案,更改模块结构,创建 docker 映像,更改 10 多个 airflow 作业以使用 docker 映像执行任务,处理这个现实并克服最初的挫败感。所有这一切只是因为,“单个 python 模块需要“gcc”来编译!”