티스토리 뷰
time 출력
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import numpy as np
from torch.autograd import Variable
import pandas as pd
from glob import glob
from torch.utils.data import TensorDataset, DataLoader
import os
import torch.optim as optim
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt
# 데이터 경로 설정
data_dir = './Data/CleanData'
# 모든 CSV 파일 목록 불러오기
csv_files = glob(os.path.join(data_dir, '*.csv'))
# 데이터를 저장할 리스트 초기화
all_temp = []
all_time = []
# 모든 파일 처리
for fname in csv_files:
# CSV 파일 읽기
df = pd.read_csv(fname)
# 필요한 데이터 컬럼 선택 및 데이터 변환 (예제는 첫 번째 컬럼을 사용)
# 실제 사용 시에는 필요한 컬럼을 선택하거나 전처리 단계를 추가할 수 있습니다.
temp = df.iloc[:, 1].values # 예제에서는 두 번째 컬럼(온도 데이터 가정) 사용
time = df.iloc[:, 1].values # 예제에서는 두 번째 컬럼(온도 데이터 가정) 사용
# 데이터 리스트에 추가
all_temp.extend(temp)
all_time.extend(time)
# numpy 배열로 변환
all_temp_np = np.array(all_temp).reshape(-1, 1) # (데이터 개수, 1차원) 형태로 변환
all_time_np = np.array(all_time).reshape(-1, 1) # (데이터 개수, 1차원) 형태로 변환
# PyTorch 텐서로 변환
all_temp_tensor = torch.Tensor(all_temp_np)
all_time_tensor = torch.Tensor(all_time_np)
print("Loaded data shape:", all_temp_tensor.shape)
print("Loaded data shape:", all_time_tensor.shape)
# 시퀀스 설정
seq_length = 3600
# 데이터를 시퀀스로 나누기
def create_sequences(temp, time, seq_length):
sequences = []
labels = []
for i in range(0, len(temp) - seq_length + 1, seq_length):
seq = temp[i:i+seq_length]
sequences.append(seq)
for i in range(0, len(time) - seq_length + 1, seq_length):
lab = time[i:i+seq_length]
labels.append(lab)
return np.array(sequences), np.array(labels)
def pad_features(sequences, seq_length):
# 시퀀스의 길이를 seq_length로 맞추기 위해 패딩 처리
padded_sequences = []
for seq in sequences:
if len(seq) < seq_length:
# 길이가 짧은 시퀀스는 패딩을 추가
seq = np.pad(seq, (0, seq_length - len(seq)), 'constant', constant_values=0)
padded_sequences.append(torch.tensor(seq))
return torch.stack(padded_sequences)
X, y = create_sequences(all_temp_tensor,all_time_tensor, seq_length)
# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# PyTorch 텐서로 변환
X_train_tensor = torch.Tensor(X_train)
X_test_tensor = torch.Tensor(X_test)
y_train_tensor = torch.Tensor(y_train).long()
y_test_tensor = torch.Tensor(y_test).long()
# TensorDataset 객체로 변환
train_data = TensorDataset(X_train_tensor, y_train_tensor)
test_data = TensorDataset(X_test_tensor, y_test_tensor)
# DataLoader 설정
batch_size = 64
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)
# Check the data
samp_dataiter = iter(train_loader)
sample_x, sample_y = next(samp_dataiter)
print('Sample input size: ', sample_x.size()) # batch_size, seq_length
print('Sample input: \n', sample_x)
print()
print('Sample label size: ', sample_y.size()) # batch_size
print('Sample label: \n', sample_y)
# RNN 모델 구성
class SimpleRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers, bidirectional):
super(SimpleRNN, self).__init__()
self.input_size, self.hidden_size, self.output_size = input_size, hidden_size, output_size
self.num_layers = num_layers
self.num_directions = 2 if bidirectional else 1
self.rnn = nn.RNN(input_size, hidden_size, num_layers=2, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
self.batch_size = x.size(0)
h0 = self.init_hidden()
out, hidden = self.rnn(x, h0)
out = out[:, -1, :] # 마지막 시퀀스의 출력만 사용
out = self.fc(out)
return torch.tanh(out), hidden
def init_hidden(self):
# `num_layers * num_directions`을 고려하여 초기 히든 상태 생성
return torch.zeros(self.num_layers * self.num_directions, self.batch_size, self.hidden_size)
# 모델 초기화
input_size = 1
hidden_size = 4
output_size = 2
num_layers = 1
bidirectional = True
model = SimpleRNN(input_size, hidden_size, output_size, num_layers, bidirectional)
# 옵티마이저와 손실 함수 설정
optimizer = optim.Adam(model.parameters(), lr=0.0015)
criterion = nn.MSELoss()
# 모델 훈련 함수
def train_model(model, train_loader, optimizer, criterion, epochs=20):
model.train()
train_losses = [] # 에포크별 평균 훈련 손실을 기록하기 위한 리스트
for epoch in range(epochs):
epoch_loss = 0.0
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs, _ = model(inputs)
loss = criterion(outputs.squeeze(), labels.float())
loss.backward()
optimizer.step()
epoch_loss += loss.item() * inputs.size(0) # 배치 손실 누적
epoch_loss = epoch_loss / len(train_loader.dataset) # 평균 손실 계산
train_losses.append(epoch_loss)
print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss}')
return train_losses
# 모델 훈련과 훈련 손실 기록
train_losses = train_model(model, train_loader, optimizer, criterion)
# 훈련 과정 시각화
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training Loss Over Time')
plt.legend()
plt.show()
# 모델 평가 함수
def evaluate_model(model, test_loader):
model.eval()
predictions = []
actuals = []
with torch.no_grad():
for inputs, labels in test_loader:
outputs, _ = model(inputs)
predictions.extend(outputs.view(-1).tolist())
actuals.extend(labels.tolist())
return predictions, actuals
# 모델 평가
predictions, actuals = evaluate_model(model, test_loader)
# 평가 지표 계산
mae = mean_absolute_error(actuals, predictions)
mse = mean_squared_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
print(f"MAE: {mae}, MSE: {mse}, R^2: {r2}")
https://colab.research.google.com/drive/1tGVgs-50_MQ5VhctsLh32eh20RpHpXJY
# -*- coding: utf-8 -*-
"""temp-time_RNN0314_이거됨.ipynb의 사본
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1tGVgs-50_MQ5VhctsLh32eh20RpHpXJY
"""
from google.colab import drive
drive.mount('/content/drive')
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from glob import glob
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os
# Adjusted data directory to point to the correct location in Google Drive
data_dir = '/content/drive/My Drive/SRC/CleanData0312'
#데이터 불러오기
csv_files = glob(os.path.join(data_dir, '*.csv'))
all_temp = []
all_time = []
for fname in csv_files:
df = pd.read_csv(fname)
temp = df.iloc[:, 1].values
time = df.iloc[:, 0].values
all_temp.extend(temp)
all_time.extend(time)
MINUTES_IN_DAY = 24 * 60
# Assuming all_time is in minutes and represents the time of day
all_time_cyclic = np.array([
[np.sin(2 * np.pi * time / MINUTES_IN_DAY), np.cos(2 * np.pi * time / MINUTES_IN_DAY)]
for time in all_time
])
# numpy 배열로 변환
all_temp_np = np.array(all_temp).reshape(-1, 1) # (데이터 개수, 1차원) 형태로 변환
all_time_np = all_time_cyclic
#all_temp_np shape: (4606, 1)
#all_time_cyclic shape: (4606, 2)
# PyTorch 텐서로 변환
#all_temp_tensor = torch.Tensor(all_temp_np)
#all_time_tensor = torch.Tensor(all_time_np)
# Assuming all_temp_np is your array of temperatures
all_data = np.concatenate((all_temp_np, all_time_np), axis=1)
# Convert to PyTorch tensor
all_data_tensor = torch.Tensor(all_data)
# Check the shape to ensure it's correct
#print("all_temp_np shape:", all_temp_np.shape)
#print("all_time_cyclic shape:", all_time_cyclic.shape)
#print("###Loaded data shape:", all_temp_tensor.shape)
#print("####Loaded data shape:", all_time_tensor.shape)
###Loaded data shape: torch.Size([4606, 1])
####Loaded data shape: torch.Size([4606, 2])
# 시퀀스 설정
seq_length = 50
#10분 단위 . 20개 하면 200분.3시간20분
# 데이터를 시퀀스로 나누기
def create_sequences(temp, time, seq_length):
sequences = []
labels = []
for i in range(0, len(temp) - seq_length + 1, seq_length):
seq = temp[i:i+seq_length]
sequences.append(seq)
for i in range(0, len(time) - seq_length + 1, seq_length):
lab = time[i:i+seq_length]
labels.append(lab)
return np.array(sequences), np.array(labels)
# def pad_features(sequences, seq_length):
# # 시퀀스의 길이를 seq_length로 맞추기 위해 패딩 처리
# padded_sequences = []
# for seq in sequences:
# if len(seq) < seq_length:
# # 길이가 짧은 시퀀스는 패딩을 추가
# seq = np.pad(seq, (0, seq_length - len(seq)), 'constant', constant_values=0)
# padded_sequences.append(torch.tensor(seq))
# return torch.stack(padded_sequences)
# X, y = create_sequences(all_temp_tensor,all_time_tensor, seq_length)
# print("Shape of X before padding:", X.shape)
# print("Shape of y before padding:", y.shape)
# X = pad_features(X, seq_length)
# print("Shape of X after padding:", X.shape)
# Split data into sequences
X, y = create_sequences(all_data_tensor[:, :1], all_data_tensor[:, 1:], seq_length)
#all_data = np.concatenate((all_temp_np, all_time_np), axis=1)
# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# PyTorch 텐서로 변환
X_train_tensor = torch.Tensor(X_train)
X_test_tensor = torch.Tensor(X_test)
y_train_tensor = torch.Tensor(y_train)
y_test_tensor = torch.Tensor(y_test)
# TensorDataset 객체로 변환
train_data = TensorDataset(X_train_tensor, y_train_tensor)
test_data = TensorDataset(X_test_tensor, y_test_tensor)
# DataLoader 설정
batch_size = 15
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)
# Check the data
samp_dataiter = iter(train_loader)
sample_x, sample_y = next(samp_dataiter)
print('Sample input size: ', sample_x.size()) # batch_size, seq_length
print('Sample input: \n', sample_x)
print('Sample label size: ', sample_y.size()) # batch_size
print('Sample label: \n', sample_y)
# RNN 모델 구성
class SimpleRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers, bidirectional):
super(SimpleRNN, self).__init__()
self.input_size, self.hidden_size, self.output_size = input_size, hidden_size, output_size
self.num_layers = num_layers
self.bidirectional = 2 if bidirectional else 1
# self.rnn = nn.RNN(input_size, hidden_size, num_layers=num_layers,bidirectional=bidirectional, batch_first=True)
self.rnn = nn.RNN(input_size, hidden_size,bidirectional=bidirectional, batch_first=True)
self.fc = nn.Linear(hidden_size * (2 if bidirectional else 1), output_size)
def forward(self, x):
batch_size = x.size(0)
h0 = torch.zeros(self.num_layers * (2 if self.bidirectional else 1), batch_size, self.hidden_size)
out, _ = self.rnn(x, h0)
out = self.fc(out)
return out
# Adjust input_size for temperature + sin(time) + cos(time)
model = SimpleRNN(input_size=1, hidden_size=4, output_size=2, num_layers=1, bidirectional=True)
# 옵티마이저와 손실 함수 설정
optimizer = optim.Adam(model.parameters(), lr=0.0022)
criterion = nn.MSELoss()
# 모델 훈련 함수
def train_model(model, train_loader, optimizer, criterion, epochs=20):
model.train()
train_losses = [] # 에포크별 평균 훈련 손실을 기록하기 위한 리스트
for epoch in range(epochs):
epoch_loss = 0.0
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs.squeeze(), labels.float())
loss.backward()
optimizer.step()
epoch_loss += loss.item() * inputs.size(0) # 배치 손실 누적
epoch_loss = epoch_loss / len(train_loader.dataset) # 평균 손실 계산
train_losses.append(epoch_loss)
print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss}')
return train_losses
# 모델 훈련과 훈련 손실 기록
train_losses = train_model(model, train_loader, optimizer, criterion)
# 훈련 과정 시각화
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training Loss Over Time')
plt.legend()
plt.show()
# 모델 평가 함수
def evaluate_model(model, test_loader):
model.eval()
predictions = []
actuals = []
with torch.no_grad():
for inputs, labels in test_loader:
outputs = model(inputs)
predictions.extend(outputs[ -1].tolist()) # Get the last timestep's output
actuals.extend(labels[ -1].tolist()) # Get the last timestep's label
return predictions, actuals
# 모델 평가
predictions, actuals = evaluate_model(model, test_loader)
# 평가 지표 계산
mae = mean_absolute_error(actuals, predictions)
mse = mean_squared_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
print(f"MAE: {mae}, MSE: {mse}, R^2: {r2}")
# Save the model state
model_path = "/content/drive/My Drive/SRC/CleanData0312/my_rnn_model.pth"
torch.save(model.state_dict(), model_path)
# To load the model state back (assuming the model class is defined)
model.load_state_dict(torch.load(model_path))
model.eval() # Set the model to evaluation mode
학습이 될리 없지만 일단 완.
조도랑 imu랑 다른 요소 넣어서 추가 학습할 계획.
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- Express
- StableDiffusion
- unity 360
- AI
- RNN
- JacobianMatrices
- motor controll
- DeepLeaning
- emotive eeg
- sequelize
- Arduino
- three.js
- 유니티플러그인
- CNC
- 라즈베리파이
- colab
- opencv
- Unity
- node.js
- oculuspro
- Python
- docker
- houdini
- Java
- ardity
- 후디니
- VR
- MQTT
- TouchDesigner
- 유니티
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
글 보관함