亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何運行 BigQuery 查詢,然后將輸出 CSV 發送到 Apache Airflow 中的

如何運行 BigQuery 查詢,然后將輸出 CSV 發送到 Apache Airflow 中的

胡說叔叔 2022-06-28 16:53:40
我需要在 python 中運行一個 bigquery 腳本,它需要在谷歌云存儲中輸出為 CSV。目前,我的腳本觸發大查詢代碼并直接保存到我的電腦。但是,我需要讓它在 Airflow 中運行,所以我不能有任何本地依賴項。我當前的腳本將輸出保存到我的本地機器,然后我必須將它移動到 GCS。上網查了一下,搞不明白。(ps我對python很陌生,所以如果之前有人問過這個問題,我提前道歉?。﹊mport pandas as pdfrom googleapiclient import discoveryfrom oauth2client.client import GoogleCredentialsdef run_script():    df = pd.read_gbq('SELECT * FROM `table/veiw` LIMIT 15000',                 project_id='PROJECT',                 dialect='standard'                 )    df.to_csv('XXX.csv', index=False)def copy_to_gcs(filename, bucket, destination_filename):    credentials = GoogleCredentials.get_application_default()    service = discovery.build('storage', 'v1', credentials=credentials)    body = {'name': destination_filename}    req = service.objects().insert(bucket=bucket,body=body, media_body=filename)    resp = req.execute()current_date = datetime.date.today()filename = (r"C:\Users\LOCALDRIVE\ETC\ETC\ETC.csv")bucket = 'My GCS BUCKET'str_prefix_datetime = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')destfile = 'XXX' + str_prefix_datetime + '.csv'print('')    ```
查看完整描述

1 回答

?
Smart貓小萌

TA貢獻1911條經驗 獲得超7個贊

Airflow 提供了多個運算符來使用 BigQuery。

您可以在 Cloud Composer 代碼示例 中查看運行查詢的示例,然后將結果導出為 CSV。

# Copyright 2018 Google LLC

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

#     https://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# Query recent StackOverflow questions.


bq_recent_questions_query = bigquery_operator.BigQueryOperator(

    task_id='bq_recent_questions_query',

    sql="""

    SELECT owner_display_name, title, view_count

    FROM `bigquery-public-data.stackoverflow.posts_questions`

    WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)

        AND creation_date >= CAST('{min_date}' AS TIMESTAMP)

    ORDER BY view_count DESC

    LIMIT 100

    """.format(max_date=max_query_date, min_date=min_query_date),

    use_legacy_sql=False,

    destination_dataset_table=bq_recent_questions_table_id)


# Export query result to Cloud Storage.

export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(

    task_id='export_recent_questions_to_gcs',

    source_project_dataset_table=bq_recent_questions_table_id,

    destination_cloud_storage_uris=[output_file],

    export_format='CSV')


查看完整回答
反對 回復 2022-06-28
  • 1 回答
  • 0 關注
  • 178 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號