편집 기록

편집 기록
  • 프로필 nowp님의 편집
    날짜2021.08.27

    concurrent.futures의 ThreadPoolExecutor 사용 중 오류 알람없이 subprocess가 종료됩니다.


    안녕하세요.

    제목처럼 concurrent.futures 모듈 중 ThreadPoolExecutor 사용해서 두 개의 SubProcess를 진행시키면 처음엔 잘 작동하다가 어느 순간 둘중 하나 임의의 SubProcess 오류알람없이 종료됩니다. 원인이 뭔지 몰라서 해결법 찾기가 어렵네요. 도움 부탁드리겠습니다!!

    • 제 PC 사양
      • window10 환경
      • IDE : Spyder
      • Python : Python 3.7 version

    아래 문제가 되고 있는 코드입니다.

    def save_BI():
        global symbol
        symbols = [ 'BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'EOS/USDT', 'TRX/USDT', 'XLM/USDT', 'WAVES/USDT' ]
        for symbol in symbols:
            get_bi_ohlcv()
    
    
    def save_UP():
        global symbol2
        symbols = [ 'BTC/KRW', 'ETH/KRW', 'XRP/KRW', 'EOS/KRW', 'TRX/KRW', 'XLM/KRW', 'WAVES/KRW' ]
        for symbol2 in symbols:
            get_up_ohlcv()    
    
    
    if __name__ == "__main__":   
        executor = ThreadPoolExecutor(max_workers=4)
        executor.submit(save_BI)
        executor.submit(save_UP)
    

    추가로, 현재 ccxt를 통해서 과거데이터를 받아오고 있는데요! 속도가 너무 느려서 multiprocess로 늘려보려했지만, 애초에 fetch_ohlcv 구문 자체가 너무 느린것 같습니다. 혹시 해결법이나 팁이 있으시면 부탁드리겠습니다.

    코드 전체 구문입니다.

    import os
    import sys
    import time
    import pandas as pd
    from pandas import ExcelWriter
    from concurrent.futures import ThreadPoolExecutor
    
    
    root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    sys.path.append(root + '/python')
    
    import ccxt  # noqa: E402
    
    msec = 1000
    minute = 60 * msec
    hold = 30
    
    binance = ccxt.binance({
        'rateLimit': 10000,
        'enableRateLimit': True,
        'options': {
        'defaultType': 'future',
        }
    })
    
    upbit = ccxt.upbit({
        'rateLimit': 10000,
        'enableRateLimit': True,
    })
    
    def get_bi_ohlcv():
        from_datetime = '2020-01-01 00:00:00'
        from_timestamp = binance.parse8601(from_datetime)
        data = []
        end = binance.parse8601('2021-08-25 00:00:00')
    
        while from_timestamp < end:
            writer = ExcelWriter(symbol+'.xlsx')
            try:
                candles = binance.fetch_ohlcv(symbol, '1m', from_timestamp)
                from_timestamp = candles[-1][0] + minute
                data += candles
                df = pd.DataFrame(data, columns=['Timestamp','Open','High','Low','Close', 'Volume'])
                df['Timestamp'] = pd.DataFrame(df['Timestamp'].apply(binance.iso8601))
                writer.save()
                print(symbol +' saved')
            except (ccxt.binanceError, ccxt.AuthenticationError, ccxt.binanceNotAvailable, ccxt.RequestTimeout) as error:
    
                print('Got an error', type(error).__name__, error.args, ', retrying in', hold, 'seconds...')
                time.sleep(hold)
    
    def get_up_ohlcv():
        from_datetime = '2020-01-01 00:00:00'
        from_timestamp = upbit.parse8601(from_datetime)
        data = []
        end = upbit.parse8601('2021-08-25 00:00:00')
    
        while from_timestamp < end:
            writer = ExcelWriter(symbol2+'.xlsx')
            try:
                candles = upbit.fetch_ohlcv(symbol2, '1m', from_timestamp)
                from_timestamp = candles[-1][0] + minute
                data += candles
                df = pd.DataFrame(data, columns=['Timestamp','Open','High','Low','Close', 'Volume'])
                df['Timestamp'] = pd.DataFrame(df['Timestamp'].apply(upbit.iso8601))
                writer.save()
                print(symbol2 +' saved')
            except (ccxt.upbitError, ccxt.AuthenticationError, ccxt.upbitNotAvailable, ccxt.RequestTimeout) as error:
    
                print('Got an error', type(error).__name__, error.args, ', retrying in', hold, 'seconds...')
                time.sleep(hold)
    
    
    def save_BI():
        global symbol
        symbols = [ 'BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'EOS/USDT', 'TRX/USDT', 'XLM/USDT', 'WAVES/USDT' ]
        for symbol in symbols:
            get_bi_ohlcv()
    
    
    def save_UP():
        global symbol2
        symbols = [ 'BTC/KRW', 'ETH/KRW', 'XRP/KRW', 'EOS/KRW', 'TRX/KRW', 'XLM/KRW', 'WAVES/KRW' ]
        for symbol2 in symbols:
            get_up_ohlcv()    
    
    
    if __name__ == "__main__":   
        executor = ThreadPoolExecutor(max_workers=4)
        executor.submit(save_BI)
        executor.submit(save_UP)
    
    
  • 프로필 알 수 없는 사용자님의 편집
    날짜2021.08.27

    concurrent.futures의 ThreadPoolExecutor 사용 중 오류 알람없이 subprocess가 종료됩니다.


    **안녕하세요.
    제목처럼 concurrent.futures 모듈 중 ThreadPoolExecutor 사용해서 두 개의 SubProcess를 진행시키면
    처음엔 잘 작동하다가 어느 순간 둘중 하나 임의의 SubProcess 오류알람없이 종료됩니다...
    원인이 뭔지 몰라서 해결법 찾기가 어렵네요 ㅠㅠ
    도움 부탁드리겠습니다!!**
    
     제 PC 사양
    
    window10 환경
    
    IDE : Spyder
    
    Python : Python 3.7 version
    

    아래 문제가 되고 있는 코드입니다.

    def save_BI():
        global symbol
        symbols = [ 'BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'EOS/USDT', 'TRX/USDT', 'XLM/USDT', 'WAVES/USDT' ]
        for symbol in symbols:
            get_bi_ohlcv()
    
    
    def save_UP():
        global symbol2
        symbols = [ 'BTC/KRW', 'ETH/KRW', 'XRP/KRW', 'EOS/KRW', 'TRX/KRW', 'XLM/KRW', 'WAVES/KRW' ]
        for symbol2 in symbols:
            get_up_ohlcv()    
    
    
    if __name__ == "__main__":   
        executor = ThreadPoolExecutor(max_workers=4)
        executor.submit(save_BI)
        executor.submit(save_UP)
    
    +++ 추가로..
    현재 ccxt를 통해서 과거데이터를 받아오고 있는데요! 
    속도가 너무 느려서 multiprocess로 늘려보려했지만, 
    애초에 fetch_ohlcv 구문 자체가 너무 느린것 같습니다.. 
    혹시 해결법이나 팁이 있으시면 부탁드리겠습니다.
    

    코드 전체 구문입니다.

    import os
    import sys
    import time
    import pandas as pd
    from pandas import ExcelWriter
    from concurrent.futures import ThreadPoolExecutor
    
    
    root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    sys.path.append(root + '/python')
    
    import ccxt  # noqa: E402
    
    msec = 1000
    minute = 60 * msec
    hold = 30
    
    binance = ccxt.binance({
        'rateLimit': 10000,
        'enableRateLimit': True,
        'options': {
        'defaultType': 'future',
        }
    })
    
    upbit = ccxt.upbit({
        'rateLimit': 10000,
        'enableRateLimit': True,
    })
    
    def get_bi_ohlcv():
        from_datetime = '2020-01-01 00:00:00'
        from_timestamp = binance.parse8601(from_datetime)
        data = []
        end = binance.parse8601('2021-08-25 00:00:00')
    
        while from_timestamp < end:
            writer = ExcelWriter(symbol+'.xlsx')
            try:
                candles = binance.fetch_ohlcv(symbol, '1m', from_timestamp)
                from_timestamp = candles[-1][0] + minute
                data += candles
                df = pd.DataFrame(data, columns=['Timestamp','Open','High','Low','Close', 'Volume'])
                df['Timestamp'] = pd.DataFrame(df['Timestamp'].apply(binance.iso8601))
                writer.save()
                print(symbol +' saved')
            except (ccxt.binanceError, ccxt.AuthenticationError, ccxt.binanceNotAvailable, ccxt.RequestTimeout) as error:
    
                print('Got an error', type(error).__name__, error.args, ', retrying in', hold, 'seconds...')
                time.sleep(hold)
    
    def get_up_ohlcv():
        from_datetime = '2020-01-01 00:00:00'
        from_timestamp = upbit.parse8601(from_datetime)
        data = []
        end = upbit.parse8601('2021-08-25 00:00:00')
    
        while from_timestamp < end:
            writer = ExcelWriter(symbol2+'.xlsx')
            try:
                candles = upbit.fetch_ohlcv(symbol2, '1m', from_timestamp)
                from_timestamp = candles[-1][0] + minute
                data += candles
                df = pd.DataFrame(data, columns=['Timestamp','Open','High','Low','Close', 'Volume'])
                df['Timestamp'] = pd.DataFrame(df['Timestamp'].apply(upbit.iso8601))
                writer.save()
                print(symbol2 +' saved')
            except (ccxt.upbitError, ccxt.AuthenticationError, ccxt.upbitNotAvailable, ccxt.RequestTimeout) as error:
    
                print('Got an error', type(error).__name__, error.args, ', retrying in', hold, 'seconds...')
                time.sleep(hold)
    
    
    def save_BI():
        global symbol
        symbols = [ 'BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'EOS/USDT', 'TRX/USDT', 'XLM/USDT', 'WAVES/USDT' ]
        for symbol in symbols:
            get_bi_ohlcv()
    
    
    def save_UP():
        global symbol2
        symbols = [ 'BTC/KRW', 'ETH/KRW', 'XRP/KRW', 'EOS/KRW', 'TRX/KRW', 'XLM/KRW', 'WAVES/KRW' ]
        for symbol2 in symbols:
            get_up_ohlcv()    
    
    
    if __name__ == "__main__":   
        executor = ThreadPoolExecutor(max_workers=4)
        executor.submit(save_BI)
        executor.submit(save_UP)