# 필요한 라이브러리 임포트
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
# 데이터 전처리를 위한 라이브러리
from sklearn.preprocessing import StandardScaler
# 데이터 분할을 위한 라이브러리
from sklearn.model_selection import train_test_split
# 머신러닝 분류 모델 라이브러리
from sklearn.linear_model import LogisticRegression # 로지스틱 회귀 모델
from sklearn.neighbors import KNeighborsClassifier # K-최근접 이웃(KNN) 모델
from sklearn.tree import DecisionTreeClassifier # 의사결정나무 모델
from sklearn.ensemble import RandomForestClassifier # 랜덤 포레스트 모델
from xgboost import XGBClassifier # XGBoost 모델
from lightgbm import LGBMClassifier # LightGBM 모델
# 데이터 불균형 처리를 위한 SMOTE
from imblearn.over_sampling import SMOTE
# 모델 성능 평가를 위한 라이브러리
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import classification_report
# 딥러닝 라이브러리(TensorFlow & Keras)
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
# TensorFlow의 랜덤 시드 설정 (재현성 확보)
tf.random.set_seed(100)
# 결측치 제거 및 결측치 개수 확인
df.dropna(inplace=True)
df.isna().sum()
# TotalCharges 열의 빈 문자열('') 데이터 처리
condition = df['TotalCharges'] == ''
df = df[-condition] # 빈 문자열 데이터 제거
# TotalCharges 열을 float 타입으로 변환
df['TotalCharges'] = df['TotalCharges'].astype('float')
# 데이터프레임 정보 확인
df.info()
# 종속변수 Churn 컬럼에 대해서 seaborn countplot으로 분포 확인
# 불균형 데이터
sns.countplot(data=df, x='Churn')
# 문자열(object) 타입의 컬럼들을 찾아서 cols 변수에 저장
cols = df.select_dtypes('object').columns.values
cols
# 범주형(문자열) 변수들을 더미 변수로 변환
# drop_first=True로 설정하여 다중공선성 방지
# dtype='int'로 설정하여 정수형으로 변환
df = pd.get_dummies(df, columns=cols, drop_first=True, dtype='int')
df.head(3)
3. 데이터 분리 및 분할
# 데이터 분리
x = df.drop('Churn_Yes', axis=1) # 예측에 사용할 특성들
y = df['Churn_Yes'] # 예측할 대상 변수
# 데이터 분할
x_train, x_test, y_train, y_test = train_test_split(
x, y,
test_size=0.3, # 테스트 데이터 30%
random_state=42, # 재현성을 위한 시드값
shuffle=True, # 데이터 섞기
stratify=y) # 타겟 비율 유지
x.train.shape, x_test.shape, y_train.shape, y_test.shape
4. 데이터 스케일링 및 불균형 처리
# 데이터 스케일링 (표준화)
ss = StandardScaler()
x_train = ss.fit_transform(x_train)
x_test = ss.transform(x_test)
# SMOTE를 사용하여 불균형 데이터 처리
smote = SMOTE(random_state=0)
# 학습 데이터에 SMOTE 적용하여 오버샘플링
x_train_over, y_train_over = smote.fit_resample(x_train, y_train)
5. 머신러닝 모델 생성 및 성능 평가
# 모델의 정확도를 출력하는 함수
def print_acc(modelName, model):
model.fit(x_train, y_train)
pred = model.predict(x_test)
print(f'{modelName}: {accuracy_score(pred, y_test)}')
# 다양한 머신러닝 모델들을 생성하고 각각의 정확도를 출력
lg = LogisticRegression()
knn = KNeighborsClassifier(n_neighbors=5)
dt = DecisionTreeClassifier()
rf = RandomForestClassifier()
xgb = XGBClassifier()
lgbm = LGBMClassifier()
modelName = [
'Logistic',
'KNN',
'DecisionTree',
'RandomForest',
'XGB',
'LGBM'
]
for index, model in enumerate([lg, knn, dt, rf, xgb, lgbm]):
print_acc(modelName[index], model)
6. 딥러닝 모델 구현 및 학습
# 간단한 이진 분류를 위한 신경망 모델 구조 정의
model = Sequential()
model.add(Dense(32, activation='relu', input_shape=(x_train.shape[-1],)))
model.add(BatchNormalization())
model.add(Dense(16, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))
model.summary()
# 모델 컴파일: Adam 옵티마이저, 이진 교차 엔트로피 손실 함수, 정확도 지표 사용
model.compile(
optimizer=Adam(),
loss='binary_crossentropy',
metrics=['accuracy']
)
# EarlyStopping과 ModelCheckpoint로 학습 과정을 모니터링하고 최적의 모델을 저장
es = EarlyStopping(monitor='val_loss', patience=5, verbose=1)
# ModelCheckpoint: 가장 좋은 모델을 'best_model.keras'로 저장
cp = ModelCheckpoint('best_model.keras', monitor='val_loss', verbose=1, save_best_only=True)
# 모델 학습: 50 에포크, 배치 사이즈 16으로 설정하고 조기 종료와 체크포인트 콜백 사용
history = model.fit(
x_train_over, y_train_over,
epochs=50,
batch_size=16,
verbose=1,
validation_data=(x_test, y_test),
callbacks=[es, cp]
)
# 저장된 최적의 모델을 불러와서 테스트 데이터에 대한 예측 정확도 계산
best_model = load_model('best_model.keras')
pred = best_model.predict(x_test)
result = []
for p in pred:
result.append(1 if p>0.5 else 0)
accuracy_score(result, y_test)
7. 예측 결과 저장
# 예측 확률값을 0 또는 1로 변환 (0.5 기준)
result = (np.array(result) >= 0.5).astype(int)
# ID 컬럼 (index는 1부터 시작)
index_values = np.arange(1, len(result) + 1)
# DataFrame 생성 (index & Churn)
submission = pd.DataFrame({
'index': index_values, # 1부터 시작하는 index
'Churn': result # 이탈 여부 (0: No, 1: Yes)
})
# CSV 파일 저장 (index 없이 저장)
submission.to_csv('churn_predictions.csv', index=False, encoding="utf-8-sig")
댓글