3 回答

TA貢獻1966條經驗 獲得超4個贊
問題在于該counter變量未在您的進程之間共享:每個單獨的進程都在創建它自己的本地實例并對其進行遞增。
有關可用于在進程之間共享狀態的某些技術,請參閱文檔的本部分。在您的情況下,您可能希望Value在工作人員之間共享一個實例
這是示例的工作版本(帶有一些虛擬輸入數據)。請注意,它使用的是全局值,在實踐中我會盡量避免使用這些值:
from multiprocessing import Pool, Value
from time import sleep
counter = None
def init(args):
''' store the counter for later use '''
global counter
counter = args
def analyze_data(args):
''' increment the global counter, do something with the input '''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10
if __name__ == '__main__':
#inputs = os.listdir(some_directory)
#
# initialize a cross-process counter and the input lists
#
counter = Value('i', 0)
inputs = [1, 2, 3, 4]
#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()

TA貢獻1775條經驗 獲得超8個贊
沒有競爭條件錯誤的計數器類:
class Counter(object):
def __init__(self):
self.val = multiprocessing.Value('i', 0)
def increment(self, n=1):
with self.val.get_lock():
self.val.value += n
@property
def value(self):
return self.val.value
添加回答
舉報