亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

從 Apache Beam 讀取 CSV 并寫入 BigQuery

從 Apache Beam 讀取 CSV 并寫入 BigQuery

胡說叔叔 2022-01-18 17:49:09
我有一個 GCS 存儲桶,我試圖從中讀取大約 20 萬個文件,然后將它們寫入 BigQuery。問題是我無法創建與代碼配合良好的 PCollection。我正在關注本教程以供參考。我有這個代碼:from __future__ import absolute_importimport argparseimport loggingimport osfrom past.builtins import unicodeimport apache_beam as beamfrom apache_beam.io import ReadFromText, ReadAllFromTextfrom apache_beam.io import WriteToTextfrom apache_beam.metrics import Metricsfrom apache_beam.metrics.metric import MetricsFilterfrom apache_beam.options.pipeline_options import PipelineOptionsfrom apache_beam.options.pipeline_options import SetupOptionsfrom google.cloud import storageimport regex as re# storage_client = storage.Client()# bucket = storage_client.get_bucket('mybucket')## blobs = bucket.list_blobs()# l=list(blobs)# x=[y.name for y in l]# c=x[1:]# print(len(c))files = ['gs://mybucket/_chunk1',         'gs://mybucket/_chunk0']class DataIngestion:    """A helper class which contains the logic to translate the file into    a format BigQuery will accept."""    def parse_method(self, string_input):        x="""{}""".format(string_input)        rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")        d = {}        d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)        d['geometry']=d['geometry'].strip('"')        return ddef run(argv=None):    """Main entry point; defines and runs the pipeline."""    data_ingestion = DataIngestion()    p = beam.Pipeline(options=PipelineOptions())    (p    | 'Create PCollection' >> beam.Create(files)    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)    | 'String To BigQuery Row' >> beam.Map(lambda s:    data_ingestion.parse_method(s))    | 'Write to BigQuery' >> beam.io.Write(    beam.io.BigQuerySink(    'mytable',files問題是如果列表只有一個元素,這段代碼就可以完美運行。只要有超過 1 個元素,轉換“String To BigQuery Row”就會出錯并顯示error: nothing to repeat [while running 'String To BigQuery Row']。這可能與正則表達式模塊有關,但我無法弄清楚出了什么問題,因為它在給定 1 個文件時可以完美運行。
查看完整描述

1 回答

?
互換的青春

TA貢獻1797條經驗 獲得超6個贊

OP 的評論讓我意識到我的錯誤:預期的庫是regex,而不是 python 的 builtin re。


使用import regex as re不僅讓我感到困惑,而且還導致re庫拋出nothing to repeat錯誤。這是因為默認情況下 Dataflow 不會保存您的主會話。


當您的解析函數中的代碼正在執行時,它無法訪問re您在構建時導入的上下文。通常,這會失敗NameError,但由于您使用的是有效的庫名稱,因此代碼假定您指的是內置re庫并嘗試按原樣執行它。


如果您import regex改用,您會看到NameError: name 'regex' is not defined,這是代碼失敗的真正原因。為了解決這個問題,要么將導入語句移動到解析函數本身,要么--save_main_session作為選項傳遞給運行程序。有關更多詳細信息,請參見此處。


老答案:


雖然我不知道您使用的是哪個版本的 Python,但您對正則表達式的懷疑似乎是正確的。 *是一個特殊字符,表示它之前的重復,但它(是一個表示分組的特殊字符,所以類似的模式(*SKIP)在語法上似乎不正確。


在 Python 3.7 中,上述表達式甚至無法編譯:


python -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'

Traceback (most recent call last):

  File "<string>", line 1, in <module>

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 234, in compile

    return _compile(pattern, flags)

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 286, in _compile

    p = sre_compile.compile(pattern, flags)

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_compile.py", line 764, in compile

    p = sre_parse.parse(p, flags)

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 930, in parse

    p = _parse_sub(source, pattern, flags & SRE_FLAG_VERBOSE, 0)

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub

    not nested and not items))

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 816, in _parse

    p = _parse_sub(source, state, sub_verbose, nested + 1)

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub

    not nested and not items))

  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 651, in _parse

    source.tell() - here + len(this))

re.error: nothing to repeat at position 11

Python 2.7.15 也不接受它:


python2 -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'

Traceback (most recent call last):

  File "<string>", line 1, in <module>

  File "/usr/lib/python2.7/re.py", line 194, in compile

    return _compile(pattern, flags)

  File "/usr/lib/python2.7/re.py", line 251, in _compile

    raise error, v # invalid expression

sre_constants.error: nothing to repeat

雖然我不知道您要匹配哪些字符串,但我懷疑您的某些字符需要轉義。例如"\{[^{}]+\}(\*SKIP)(\*FAIL)|,"


查看完整回答
反對 回復 2022-01-18
  • 1 回答
  • 0 關注
  • 210 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號