파이썬 multiprocessing.Process subclass 이용하기
조회수 691회
파이썬의 multiprocess.Process을 상속받는 subclass을 만들어서 코드를 짜고자 합니다.
최대한 코드를 간단하게 바꿔봤습니다.
숫자를 배열할 때 permuatation하는 많은 양의 리스트 생성은 메모리나 실행 시간 측면에서 비효율적이므로 generator
을 사용했습니다.
import itertools
import multiprocessing as mp
import numpy as np
import ctypes
import time
import random
from datetime import datetime
from sympy.combinatorics.permutations import Permutation
from sympy.utilities.iterables import multiset_permutations
from multiprocessing import Pool, Process, Queue, Manager, Value
def do_performance(i, j):
""" a function what I want to calculate """
time.sleep(1)
i, j = np.ravel(i), np.ravel(j)
ij = i + j
ij = np.reshape(ij, (-1, 3))
out = np.sum(ij, axis=1)
out = np.sum(out, axis=0)
return out
def generate_new_list(result, n_sample, n_replica, n_child):
""" 기존에 생성된 리스트를 기반으로 새로운 리스트을 생성"""
f_num = random.randint(0, n_sample-1)
out = []
for idx, i in enumerate(result):
gen_iter, set_of_replica = 0, i[0] # 바꿀 set of repclia: List
while gen_iter < n_child:
# 각 set_of_replica내 에서 변경될 frame의 위치 index 랜덤하게 생성에서 첫 번째만 사용
exchange_idx = np.random.choice(np.arange(n_replica-1)[1:-2], n_replica - 2, replace=True)[0]
# 같은 frame index의 중복을 막기 위해 기존의 set_of_replica에 없는 frame index만을 사용
while f_num in set_of_replica:
f_num = random.randint(0, n_sample - 1)
# assign new frame index (f_num) at new position (exchange_idx) in set_of_replica
i[0][exchange_idx] = f_num
out.append(i[0])
gen_iter += 1
print(len(out))
return out
class Worker(Process):
def __init__(self, MDarr, in_queue, out_queue):
super(Worker, self).__init__()
self.MDarr = MDarr
self.in_queue = in_queue
self.out_queue = out_queue
def mcSimulation(self, replicaData, MDarr):
out = [(i, do_performance(MDarr[0][i, :, :], MDarr[1][i, :, :])) for i in replicaData]
out.sort(key=lambda x: x[1])
return out[: 100]
def run(self):
while True:
input_list = self.in_queue.get()
# sleep to allow the other workers a chance (b/c the work action is too simple)
time.sleep(1)
# put the transformed work on the queue and do simulation
self.out_queue.put(self.mcSimulation(input_list, self.MDarr))
class MonteCarlo:
def __init__(self, ):
super(MonteCarlo, self).__init__()
self.initialize_simulation()
def initialize_simulation(self):
# define variables
self.n_proc, self.n_replica = 5, 10
dt = 4e-12
# generate randomized numpy array for example
self.initIdx, self.endIdx = 135, 60918
self.n_sample, self.n_atom = 50000, 22
self.MDarr = np.random.normal(size=(self.n_sample, self.n_atom, 3))
# generate some of list as input into mp.Manager.Queue()
index_pool = np.random.choice(np.arange(self.n_sample), self.n_replica,)
set_of_list = [multiset_permutations(index_pool) for i in range(self.n_proc)]
self.input_list = [list(itertools.islice(i, 0, 1000))[: 100] for i in set_of_list]
# generate shared_memory array
self.sharedBaseArr = mp.Array(
ctypes.c_double, (2 * self.n_sample * self.n_atom * 3), lock=False)
self.main_NpArray = np.frombuffer(
self.sharedBaseArr, dtype=ctypes.c_double).reshape(2, self.n_sample, self.n_atom, 3)
np.copyto(self.main_NpArray, self.MDarr)
assert self.main_NpArray.base.base is self.sharedBaseArr, f'shared base array has different shape with main numpy array'
# generate mp.Manager.Queue object for data sharing
self.replica_manager = mp.Manager()
self.in_queue, self.out_queue = self.replica_manager.Queue(), self.replica_manager.Queue()
return None
def mc_scheduler(self, result):
"""이전보다 낮은 각ㅄ을 내놓았을 때만 """
if self.previous_result == 0:
print('update scheduler')
self.scheduler_val = 0
self.previous_result = result
self.min_val = result[0][1]
elif self.previous_result != 0:
self.optimal = result
self.out = [i for i in result if i[1] < self.min_val]
if len(self.out) == 0:
self.scheduler_val += 1
self.min_val = self.min_val
else:
# reset mc_scheduler
self.scheduler_val = 0
self.min_val = self.out[0][1]
def run(self):
s1time = time.time()
n_child = 5
print(f"Start code {datetime.now()}")
print(f"construct the {self.n_proc} workers (mp.Process)")
print(f"fork and start child process")
workers = [Worker(self.main_NpArray, self.in_queue, self.out_queue) for name in range(self.n_proc)]
[worker.start() for worker in workers]
print("add data to the manager.queue for multi-processes")
[self.in_queue.put(replica_set) for replica_set in self.input_list]
print("update initial results")
self.previous_result = 0
result = [i for i in self.out_queue.get()]
self.mc_scheduler(result)
while self.scheduler_val < 100:
# From the action value obtained from each process, get Action results from self.out_queue
result = [i for i in self.out_queue.get()]
print(f"result: {result}")
# compare previous results
self.mc_scheduler(result)
# generate the new input list
new_input_list = generate_new_list( result, self.n_sample, self.n_replica, n_child)
# set_of_list = np.random.choice(np.arange(self.n_sample), self.n_replica,)
# new_input_list = list(itertools.islice(multiset_permutations(set_of_list), 0, 1000))
[self.in_queue.put(new_input_list) for input_list in new_input_list]
if __name__ == '__main__':
sample_obj = MonteCarlo()
sample_obj.run()
1) 코드를 돌리면 n_proc개의 process을 사용하지 못 합니다.
어떻게 하면 문제를 해결할 수 있을까요?
-
(•́ ✖ •̀)
알 수 없는 사용자
댓글 입력