亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 pool.imap 時無法腌制 psycopg2.extensions.connection

使用 pool.imap 時無法腌制 psycopg2.extensions.connection

夢里花落0921 2022-10-06 19:31:33
我正在嘗試構建一個應用程序,它將“檢查”一個單元格,該單元格是一個覆蓋地理數據庫中一部分土地的正方形,并對該單元格內的特征進行分析。由于我要處理許多單元格,因此我使用的是多處理方法。我讓它在我的對象內部有點像這樣工作:class DistributedGeographicConstraintProcessor:    ...    def _process_cell(self, conn_string):        conn = pg2.connect(conn_string)        try:            cur = conn.cursor()            cell_id = self._check_out_cell(cur)            conn.commit()            print(f"processing cell_id {cell_id}...")            for constraint in self.constraints:                # print(f"processing {constraint.name()}...")                query = constraint.prepare_distributed_query(self.job, self.grid)                cur.execute(query, {                    "buffer": constraint.buffer(),                    "cell_id": cell_id,                    "name": constraint.name(),                    "simplify_tolerance": constraint.simplify_tolerance()                })            # TODO: do a final race condition check to further suppress duplicates            self._check_in_cell(cur, cell_id)            conn.commit()        finally:            del cur            conn.close()        return None    def run(self):        while True:            if not self._job_finished():                params = [self.conn_string] * self.num_cores                processes = []                for param in params:                    process = mp.Process(target=self._process_cell, args=(param,))                    processes.append(process)                    sleep(0.1)  # Prevent multiple processes from checkout out the same grid square                    process.start()                for process in processes:                    process.join()            else:                self._finalize_job()                break但問題是它只會啟動四個進程并等到它們都完成后再啟動四個新進程。我想這樣當一個進程完成它的工作時,它會立即開始在下一個單元上工作,即使它的協同進程還沒有完成。我不確定如何實現這一點,我嘗試過使用這樣的池:def run(self):    pool = mp.Pool(self.num_cores)    unprocessed_cells = self._unprocessed_cells()    for i in pool.imap(self._process_cell, unprocessed_cells):        print(i)
查看完整描述

1 回答

?
拉風的咖菲貓

TA貢獻1995條經驗 獲得超2個贊

我的猜測是您將一些連接對象附加到self; 嘗試僅使用函數(無類/方法)重寫您的解決方案。


這是我前段時間使用的單生產者/多工人解決方案的簡化版本:


def worker(param):

    //connect to pg

    //do work



def main():

    pool = Pool(processes=NUM_PROC)

    tasks = []

    for param in params:

        t = pool.apply_async(utils.process_month, args=(param, ))

        tasks.append(t)

    pool.close()

    finished = false

    while not finished:     

        finished = True

        for t in tasks:

            if not t.ready():

                finished = False

                break

        time.sleep(1)


查看完整回答
反對 回復 2022-10-06
  • 1 回答
  • 0 關注
  • 114 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號