파이썬 multiprocessing.Process subclass 이용하기

조회수 54회

파이썬의 multiprocess.Process을 상속받는 subclass을 만들어서 코드를 짜고자 합니다. 최대한 코드를 간단하게 바꿔봤습니다. 숫자를 배열할 때 permuatation하는 많은 양의 리스트 생성은 메모리나 실행 시간 측면에서 비효율적이므로 generator을 사용했습니다.

import itertools
import multiprocessing as mp
import numpy as np
import ctypes
import time
import random

from datetime import datetime
from sympy.combinatorics.permutations import Permutation
from sympy.utilities.iterables import multiset_permutations
from multiprocessing import Pool, Process, Queue, Manager, Value


def do_performance(i, j):
    """ a function what I want to calculate """
    time.sleep(1)
    i, j = np.ravel(i), np.ravel(j)
    ij = i + j
    ij = np.reshape(ij, (-1, 3))
    out = np.sum(ij, axis=1)
    out = np.sum(out, axis=0)
    return out

def generate_new_list(result, n_sample, n_replica, n_child):
    """ 기존에 생성된 리스트를 기반으로 새로운 리스트을 생성"""
    f_num = random.randint(0, n_sample-1)
    out = []

    for idx, i in enumerate(result):
        gen_iter, set_of_replica = 0, i[0]  # 바꿀 set of repclia: List

        while gen_iter < n_child:
        # 각 set_of_replica내 에서 변경될 frame의 위치 index 랜덤하게 생성에서 첫 번째만 사용
            exchange_idx = np.random.choice(np.arange(n_replica-1)[1:-2], n_replica - 2, replace=True)[0]
            # 같은 frame index의 중복을 막기 위해 기존의 set_of_replica에 없는 frame index만을 사용
            while f_num in set_of_replica:
                f_num = random.randint(0, n_sample - 1)

            # assign new frame index (f_num) at new position (exchange_idx) in set_of_replica
            i[0][exchange_idx] = f_num
            out.append(i[0])
            gen_iter += 1
print(len(out))
return out


class Worker(Process):
    def __init__(self, MDarr, in_queue, out_queue):
        super(Worker, self).__init__()
        self.MDarr = MDarr
        self.in_queue = in_queue
        self.out_queue = out_queue

    def mcSimulation(self, replicaData, MDarr):
        out = [(i, do_performance(MDarr[0][i, :, :], MDarr[1][i, :, :])) for i in replicaData]
        out.sort(key=lambda x: x[1])
        return out[: 100]

    def run(self):
        while True:
            input_list = self.in_queue.get()

            # sleep to allow the other workers a chance (b/c the work action is too simple)
            time.sleep(1)

            # put the transformed work on the queue and do simulation
            self.out_queue.put(self.mcSimulation(input_list, self.MDarr))


class MonteCarlo:
    def __init__(self, ):
        super(MonteCarlo, self).__init__()
        self.initialize_simulation()

    def initialize_simulation(self):
        # define variables
        self.n_proc, self.n_replica = 5, 10
        dt = 4e-12

        # generate randomized numpy array for example
        self.initIdx, self.endIdx = 135, 60918
        self.n_sample,  self.n_atom = 50000, 22
        self.MDarr = np.random.normal(size=(self.n_sample, self.n_atom, 3))

        # generate some of list as input into mp.Manager.Queue()
        index_pool = np.random.choice(np.arange(self.n_sample), self.n_replica,)
        set_of_list = [multiset_permutations(index_pool) for i in range(self.n_proc)]
        self.input_list = [list(itertools.islice(i, 0, 1000))[: 100]  for i in set_of_list]

        # generate shared_memory array
        self.sharedBaseArr = mp.Array(
            ctypes.c_double, (2 * self.n_sample * self.n_atom * 3), lock=False)
        self.main_NpArray = np.frombuffer(
            self.sharedBaseArr, dtype=ctypes.c_double).reshape(2, self.n_sample, self.n_atom, 3)
        np.copyto(self.main_NpArray, self.MDarr)
        assert self.main_NpArray.base.base is self.sharedBaseArr, f'shared base array has different         shape with main numpy array'

        # generate mp.Manager.Queue object for data sharing
        self.replica_manager = mp.Manager()
        self.in_queue, self.out_queue = self.replica_manager.Queue(), self.replica_manager.Queue()
        return None

    def mc_scheduler(self, result):
        """이전보다 낮은 각ㅄ을 내놓았을 때만 """
        if self.previous_result == 0:
            print('update scheduler')
            self.scheduler_val = 0
            self.previous_result = result
            self.min_val = result[0][1]

        elif self.previous_result != 0:
            self.optimal = result
            self.out = [i for i in result if i[1] < self.min_val]

            if len(self.out) == 0:
                self.scheduler_val += 1
                self.min_val = self.min_val
            else:
                # reset mc_scheduler
                self.scheduler_val = 0
                self.min_val = self.out[0][1]

    def run(self):
        s1time = time.time()
        n_child = 5
        print(f"Start code {datetime.now()}")
        print(f"construct the {self.n_proc} workers (mp.Process)")

        print(f"fork and start child process")
        workers = [Worker(self.main_NpArray, self.in_queue,  self.out_queue) for name in range(self.n_proc)]
        [worker.start() for worker in workers]

        print("add data to the manager.queue for multi-processes")
        [self.in_queue.put(replica_set) for replica_set in self.input_list]

        print("update initial results")
        self.previous_result = 0
        result = [i for i in self.out_queue.get()]
        self.mc_scheduler(result)

        while self.scheduler_val < 100:
            # From the action value obtained from each process, get Action results from self.out_queue
            result = [i for i in self.out_queue.get()]
            print(f"result: {result}")

            # compare previous results
            self.mc_scheduler(result)

            # generate the new input list
            new_input_list = generate_new_list(  result, self.n_sample, self.n_replica, n_child)
            # set_of_list = np.random.choice(np.arange(self.n_sample), self.n_replica,)
            # new_input_list = list(itertools.islice(multiset_permutations(set_of_list), 0, 1000))
            [self.in_queue.put(new_input_list)    for input_list in new_input_list]

if __name__ == '__main__':
    sample_obj = MonteCarlo()
    sample_obj.run()

1) 코드를 돌리면 n_proc개의 process을 사용하지 못 합니다.

어떻게 하면 문제를 해결할 수 있을까요?

답변을 하려면 로그인이 필요합니다.

Hashcode는 개발자들을 위한 무료 QnA 사이트입니다. 계정을 생성하셔야만 답변을 작성하실 수 있습니다.

(ಠ_ಠ)
(ಠ‿ಠ)

ᕕ( ᐛ )ᕗ
로그인이 필요합니다

Hashcode는 개발자들을 위한 무료 QnA사이트 입니다. 계정을 생성하셔야만 글을 작성하실 수 있습니다.