我正在嘗試構建一個應用程序,它將“檢查”一個單元格,該單元格是一個覆蓋地理數據庫中一部分土地的正方形,并對該單元格內的特征進行分析。由于我要處理許多單元格,因此我使用的是多處理方法。我讓它在我的對象內部有點像這樣工作:class DistributedGeographicConstraintProcessor: ... def _process_cell(self, conn_string): conn = pg2.connect(conn_string) try: cur = conn.cursor() cell_id = self._check_out_cell(cur) conn.commit() print(f"processing cell_id {cell_id}...") for constraint in self.constraints: # print(f"processing {constraint.name()}...") query = constraint.prepare_distributed_query(self.job, self.grid) cur.execute(query, { "buffer": constraint.buffer(), "cell_id": cell_id, "name": constraint.name(), "simplify_tolerance": constraint.simplify_tolerance() }) # TODO: do a final race condition check to further suppress duplicates self._check_in_cell(cur, cell_id) conn.commit() finally: del cur conn.close() return None def run(self): while True: if not self._job_finished(): params = [self.conn_string] * self.num_cores processes = [] for param in params: process = mp.Process(target=self._process_cell, args=(param,)) processes.append(process) sleep(0.1) # Prevent multiple processes from checkout out the same grid square process.start() for process in processes: process.join() else: self._finalize_job() break但問題是它只會啟動四個進程并等到它們都完成后再啟動四個新進程。我想這樣當一個進程完成它的工作時,它會立即開始在下一個單元上工作,即使它的協同進程還沒有完成。我不確定如何實現這一點,我嘗試過使用這樣的池:def run(self): pool = mp.Pool(self.num_cores) unprocessed_cells = self._unprocessed_cells() for i in pool.imap(self._process_cell, unprocessed_cells): print(i)
1 回答

拉風的咖菲貓
TA貢獻1995條經驗 獲得超2個贊
我的猜測是您將一些連接對象附加到self; 嘗試僅使用函數(無類/方法)重寫您的解決方案。
這是我前段時間使用的單生產者/多工人解決方案的簡化版本:
def worker(param):
//connect to pg
//do work
def main():
pool = Pool(processes=NUM_PROC)
tasks = []
for param in params:
t = pool.apply_async(utils.process_month, args=(param, ))
tasks.append(t)
pool.close()
finished = false
while not finished:
finished = True
for t in tasks:
if not t.ready():
finished = False
break
time.sleep(1)
添加回答
舉報
0/150
提交
取消