concurrent.futures의 ThreadPoolExecutor 사용 중 오류 알람없이 subprocess가 종료됩니다.

조회수 629회

안녕하세요.

제목처럼 concurrent.futures 모듈 중 ThreadPoolExecutor 사용해서 두 개의 SubProcess를 진행시키면 처음엔 잘 작동하다가 어느 순간 둘중 하나 임의의 SubProcess 오류알람없이 종료됩니다. 원인이 뭔지 몰라서 해결법 찾기가 어렵네요. 도움 부탁드리겠습니다!!

  • 제 PC 사양
    • window10 환경
    • IDE : Spyder
    • Python : Python 3.7 version

아래 문제가 되고 있는 코드입니다.

def save_BI():
    global symbol
    symbols = [ 'BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'EOS/USDT', 'TRX/USDT', 'XLM/USDT', 'WAVES/USDT' ]
    for symbol in symbols:
        get_bi_ohlcv()


def save_UP():
    global symbol2
    symbols = [ 'BTC/KRW', 'ETH/KRW', 'XRP/KRW', 'EOS/KRW', 'TRX/KRW', 'XLM/KRW', 'WAVES/KRW' ]
    for symbol2 in symbols:
        get_up_ohlcv()    


if __name__ == "__main__":   
    executor = ThreadPoolExecutor(max_workers=4)
    executor.submit(save_BI)
    executor.submit(save_UP)

추가로, 현재 ccxt를 통해서 과거데이터를 받아오고 있는데요! 속도가 너무 느려서 multiprocess로 늘려보려했지만, 애초에 fetch_ohlcv 구문 자체가 너무 느린것 같습니다. 혹시 해결법이나 팁이 있으시면 부탁드리겠습니다.

코드 전체 구문입니다.

import os
import sys
import time
import pandas as pd
from pandas import ExcelWriter
from concurrent.futures import ThreadPoolExecutor


root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(root + '/python')

import ccxt  # noqa: E402

msec = 1000
minute = 60 * msec
hold = 30

binance = ccxt.binance({
    'rateLimit': 10000,
    'enableRateLimit': True,
    'options': {
    'defaultType': 'future',
    }
})

upbit = ccxt.upbit({
    'rateLimit': 10000,
    'enableRateLimit': True,
})

def get_bi_ohlcv():
    from_datetime = '2020-01-01 00:00:00'
    from_timestamp = binance.parse8601(from_datetime)
    data = []
    end = binance.parse8601('2021-08-25 00:00:00')

    while from_timestamp < end:
        writer = ExcelWriter(symbol+'.xlsx')
        try:
            candles = binance.fetch_ohlcv(symbol, '1m', from_timestamp)
            from_timestamp = candles[-1][0] + minute
            data += candles
            df = pd.DataFrame(data, columns=['Timestamp','Open','High','Low','Close', 'Volume'])
            df['Timestamp'] = pd.DataFrame(df['Timestamp'].apply(binance.iso8601))
            writer.save()
            print(symbol +' saved')
        except (ccxt.binanceError, ccxt.AuthenticationError, ccxt.binanceNotAvailable, ccxt.RequestTimeout) as error:

            print('Got an error', type(error).__name__, error.args, ', retrying in', hold, 'seconds...')
            time.sleep(hold)

def get_up_ohlcv():
    from_datetime = '2020-01-01 00:00:00'
    from_timestamp = upbit.parse8601(from_datetime)
    data = []
    end = upbit.parse8601('2021-08-25 00:00:00')

    while from_timestamp < end:
        writer = ExcelWriter(symbol2+'.xlsx')
        try:
            candles = upbit.fetch_ohlcv(symbol2, '1m', from_timestamp)
            from_timestamp = candles[-1][0] + minute
            data += candles
            df = pd.DataFrame(data, columns=['Timestamp','Open','High','Low','Close', 'Volume'])
            df['Timestamp'] = pd.DataFrame(df['Timestamp'].apply(upbit.iso8601))
            writer.save()
            print(symbol2 +' saved')
        except (ccxt.upbitError, ccxt.AuthenticationError, ccxt.upbitNotAvailable, ccxt.RequestTimeout) as error:

            print('Got an error', type(error).__name__, error.args, ', retrying in', hold, 'seconds...')
            time.sleep(hold)


def save_BI():
    global symbol
    symbols = [ 'BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'EOS/USDT', 'TRX/USDT', 'XLM/USDT', 'WAVES/USDT' ]
    for symbol in symbols:
        get_bi_ohlcv()


def save_UP():
    global symbol2
    symbols = [ 'BTC/KRW', 'ETH/KRW', 'XRP/KRW', 'EOS/KRW', 'TRX/KRW', 'XLM/KRW', 'WAVES/KRW' ]
    for symbol2 in symbols:
        get_up_ohlcv()    


if __name__ == "__main__":   
    executor = ThreadPoolExecutor(max_workers=4)
    executor.submit(save_BI)
    executor.submit(save_UP)

답변을 하려면 로그인이 필요합니다.

프로그래머스 커뮤니티는 개발자들을 위한 Q&A 서비스입니다. 로그인해야 답변을 작성하실 수 있습니다.

(ಠ_ಠ)
(ಠ‿ಠ)