Positive-Influence-Data

[Joblib] Python으로 대량 CSV 병렬 처리 자동화하기 본문

Library

[Joblib] Python으로 대량 CSV 병렬 처리 자동화하기

DS쟁이 2025. 5. 22. 15:32

회사에서 업무를 하다 보면 같은 작업을 반복해야 할 때가 있다.

필자는 회사에서 약 15000개의 폴더 안에 있는 CSV파일을 열어서 특정 칼럼을 삭제하는 작업을 해야 했다.

평소 같았다면 Python으로 15,000개의 폴더 리스트를 만들고, 각 폴더 안에서 CSV 파일을 찾아 하나씩 Pandas로 읽은 뒤, 해당 칼럼을 삭제하고 저장했을 것이다.

시간만 투자하면 손쉽게 할 수 있는 일이다.

하지만 필자는 약 15000개의 폴더를 봐야 했고 시간이 한정적이라 이 작업을 병렬로 처리해야겠다고 판단했다. 

내가 가지고 있는 컴퓨팅자원을 최대한 활용해 보자

 

이 글에서는 joblib 라이브러리를 활용해 대량의 CSV 파일을 병렬로 빠르게 처리하는 방법을 소개한다.

 

🔍 joblib이란?

Python에서 병렬처리(parallel processing) 및 객체직렬화(object serialization)를 쉽게 할 수 있도록 도와주는 라이브러리 특히, CPU-bound 작업(연산이 많은 작업)에 적합하다.
  • 병렬 처리: 여러 CPU 코어를 활용하여 작업을 동시에 실행
  • 객체 직렬화: 복잡한 Python 객체를 파일로 저장하고 다시 불러올 수 있음 (dump, load)

👉 "병렬처리"와 "객체직렬화"중 필자는 병렬처리에 대해 서술하려고 한다.

🔍 joblib의 병렬 처리

joblib에서 병렬 처리는 매우 간단한 문법으로 사용할 수 있다.
특히 반복 작업(예: 폴더별 파일 처리, 이미지 전처리, 데이터 계산 등)을 여러 CPU 코어에 나눠서 빠르게 처리할 수 있다.

from joblib import Parallel, delayed

def 작업함수(x):
    # 여기에 반복할 작업을 작성
    return x * x

results = Parallel(n_jobs=-1)(delayed(작업함수)(i) for i in range(10))
print(results)

 

  • Parallel(n_jobs=-1): 사용 가능한 모든 CPU 코어를 사용.
    • n_jobs=10처럼 숫자를 지정하면 해당 개수만큼 코어 사용
  • delayed(작업함수)(i): 반복할 함수에 각각의 인자를 넘김.
    • 내부적으로 함수와 인자를 묶어 병렬 실행할 준비

📌 Tip : 내 PC의 활용가능한 CPU의 개수를 알아보려면?

import multiprocessing

print("사용 가능한 CPU 개수:", multiprocessing.cpu_count())

👉 multiprocessing 라이브러리를 통해 사용가능한 CPU의 개수를 알 수 있다. n_jobs를 조정하고 싶을 때 활용하면 좋을 듯하다.

👉 multiprocessing의 cpu_count는 물리코어와 논리코어 모두 합산하여 나온다.

🔍 joblib를 활용한 병렬처리 예시(1)

필자는 병렬처리를  업무에 적용하기에 앞서 얼마나 잘 작동하는지 확인하기 위해 테스트를 진행했다.

For문으로 0~99까지 총 100개를 반복실행하면서 For문의 숫자끼리 곱하는 즉, 제곱을 하려고 한다. 
For문에서 하나의 실행이 돌아갈 때마다 1초씩 쉬도록 만들면 병렬처리 하는 것과 하지 않는 것의 차이를 느낄 수 있을 것이다.

 

📌 병렬처리 X

import time

start = time.time()

def process(i):
    time.sleep(1)
    return i * i

results = [process(i) for i in range(100)]

print(f"총 실행 시간: {time.time() - start:.2f}초")

## 총 실행 시간: 100.63초

👉 병렬처리를 하지 않았으니 CPU가 하나만 돌아서 For문 100번이 돌 때 100초가 걸렸다.

 

 

📌 병렬처리 O

from joblib import Parallel, delayed
import time

start = time.time()

def process(i):
    time.sleep(1)  # 작업 시뮬레이션
    return i * i

results = Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
print(f"총 실행 시간: {time.time() - start:.2f}초")

# 총 실행 시간: 5.06초

👉 병렬처리를 하고 For문 100번이 돌아서 5초가 걸렸다.
** 필자가 활용가능한 CPU의 개수는 20개

 

👉 필자의 컴퓨터에서 CPU가 20개가 한꺼번에 돌았기 때문에 1초에 20번씩 총 5초가 걸렸다. 복잡한 연산이 아니라면 확실히 빠르다.

 

🔍 joblib를 활용한 병렬처리 예시(2)

필자가 업무 하면서 활용했던 병렬처리를 살펴보려고 한다.

필자의 경우 약 15000개 폴더 안에 있는 CSV파일의 칼럼을 제거해야 했다. 아래 코드는 그 예시이다.

from joblib import Parallel, delayed
import pandas as pd
import os
from tqdm.notebook import tqdm # 주피터 노트북에서 실행하기 때문에 tqdm.notebook을 사용
import warnings

warnings.filterwarnings("ignore")

# 설정
base_dir = r"D:/Temp"
columns_to_drop = ["1", "2"]
folder_list = [folder for folder in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, folder))]
error_folders = []  # 오류 기록용 리스트

# tqdm + joblib 연동을 위한 wrapper
def tqdm_parallel_map(func, iterable, n_jobs=-1):
    with Parallel(n_jobs=n_jobs) as parallel:
        return list(tqdm(parallel(delayed(func)(i) for i in iterable), total=len(iterable)))

# 파일 처리 함수
def process_folder(folder_name):
    folder_path = os.path.join(base_dir, folder_name)
    try:
        # 폴더 내 CSV 파일 찾기 (가장 먼저 찾은 하나만 처리)
        csv_files = [f for f in os.listdir(folder_path) if f.endswith(".csv")]
        if not csv_files:
            return
        file_path = os.path.join(folder_path, csv_files[0])

        # CSV 읽기
        df = pd.read_csv(file_path, encoding='cp949') # 컬럼과 Value값이 한글로 된 csv를 불러와서 cp949로 인코딩 지정

        # 컬럼 삭제 (없어도 무시)
        df.drop(columns=columns_to_drop, inplace=True, errors='ignore') # 컬럼이 없어도 에러를 넘기기 위해서 errors='ignore' 를 사용

        # 덮어쓰기 저장
        df.to_csv(file_path, index=False, encoding='cp949')
    
    except Exception as e:
        error_folders.append(f"{folder_name} - {e}")

# 실행
tqdm_parallel_map(process_folder, folder_list)

# 로그 저장
if error_folders:
    with open("error_log.txt", "w", encoding="utf-8") as f:
        f.write("\n".join(error_folders))
    print(f"\n❗ 오류 발생 폴더 {len(error_folders)}개 - error_log.txt에 저장됨")
    # 에러로그를 저장하고 에러가 발생했을 시에 어떤 에러가 발생했는지 보기 위함
else:
    print("\n✅ 모든 폴더가 정상적으로 처리되었습니다.")

1.  Temp라는 폴더 안에 약 15000개의 폴더가 있다. folder_list 안에 그 15000개의 폴더 이름들이 들어간다.

2.  Temp 안에 있는 폴더들 각각을 병렬로 처리하며, 각 폴더에서 CSV 파일을 불러오고 pandas 연산(칼럼 제거 등)은 각각의 프로세스에서 단일 스레드로 수행된다.

3. 예외처리로 df안에 해당 칼럼이 없어도 무시하고 진행할 수 있게 하고 오류가 발생했을 때는 에러로그를 만들어 저장하게 된다.

 

👉 실행 결과 약 15000개의 폴더 안에 있는 CSV 파일들의 칼럼제거는 단 1초 만에 끝이 났고 필자가 원하는 대로 모든 파일들의 

 지정된 칼럼들이 제거되었다.

 

💡 마무리

이번 작업을 통해 joblib을 활용한 병렬 처리의 효과를 체감할 수 있었다. 단순 반복 작업도 병렬로 처리하면 처리 속도를 극적으로 줄일 수 있다는 점은 매우 인상 깊었다.

 

대량의 파일을 다루거나 계산이 많은 작업에서는 앞으로도 joblib뿐 아니라 multiprocessing, Dask, Ray 등의 도구들도 비교해 보며 적절한 도구를 선택할 계획이다.

 

Comments