Stock Trading AI Bot Using Python | ML Trading

The project at hand focuses on creating an AI system that can be used specifically for trading. It's all about making trading a bit smarter and easier

Introduction

Trading, as we all know, is an exhilarating journey that requires skillful planning and a deep understanding of the ever-changing financial markets. Since that, I would like to give a disclaimer before getting into the topic,
Dear readers, before we venture any further into trading AI and technical indicators, it's crucial to understand that the information provided in this article is for educational and informational purposes only. We are not offering any financial or stock recommendations, nor are we providing any investment advice.

These days, the finance and trading world is experiencing a big boost because of Artificial Intelligence. A lot of people are excited about using AI programs to help them trade and understand how money markets work. The project at hand focuses on creating an AI system that can be used specifically for trading. It's all about making trading a bit smarter and easier for everyone involved.

For this project I named Trado AI, we are utilizing a technique known as Deep Reinforcement Learning. If you're familiar with machine learning, you may recognize this term. It is a field in Artificial Intelligence dedicated to training Neural Network Agents to complete tasks through trial and error in a designated environment. A noteworthy example of this is AlphaGo, which was created by Google DeepMind. If you haven't heard of it, it's definitely worth taking a look at. AlphaGo is an AI trained using Deep Reinforcement Learning to play the complex game of Go. AlphaGo is exposed to a Go environment where it can take some specific actions and based on the good actions it took it gets nice rewards. After millions of epochs, the AI learned to play the game from scratch above the super-human level. The idea of Reinforcement Learning is similar to the example of AlphaGo training but differs in the environment and strategies used.

Our environment is the stock market and our strategy is to train a Deep RL Agent that can decide when to buy, sell, and hold a stock. So it's too much of an introduction, let's get started!


Things Needed

Here are the things we needed for this project,

  • Google Colab: I'm using Google Colab for the whole project. The great advantage of using Google Colab is that you don't need to install the whole libraries for projects since some of them are already installed. You can also use your own local system or other software like SageMaker Studio Lab.
  • Dataset: For creating the environment, we need a dataset. I'm using the Reliance Dataset available here. This dataset consists of the price of the stock in a one-minute time frame.
  • Gym: Gym is an open-source environment library created by OpenAI. It has a collection of environments that can be used for RL tasks. It also supports custom environments.
  • Stable Baselines3 and sb3_contrib: An open-source Reinforcement Learning library that provides different types of Reinforcement Learning Algorithms. Sb3_contrib is a library that comes under stable baselines3 used for Experimental reinforcement learning.
  • Numpy: None of us can imagine a Machine Learning project without Numpy. An all-in-one mathematical library for scientific computing.
  • Pandas: Pandas is a library used for managing our CSV datasets
  • Sklearn: For this project, sklearn is used for scaling our dataset.

The Approach

Now, let's discuss how we are going to tackle this problem. As we said our goal is to train an Agent to make meaningful trades. How this can be done? Well, we use the same idea that traders use, Technical Indicators. Technical Indicators help us to make meaningful trades by looking at the situation of the market like the price variations, momentum, direction of price movements, etc. Two of the popular Technical Indicators are MACD (Moving Average Convergence Divergence) and RSI (Relative Strength Index) which tell what might be the performance of the stock in the future. This is really important for our RL Agent.

Here are the steps involved,
  • Preprocess the dataset: First, we'll preprocess the dataset by removing unwanted columns, adding Technical Indicators, scaling, etc
  • Custom Gym Environment: Using the preprocessed dataset we create a custom trading environment using Gym which maps some of the features of the real stock market.
  • Testing Environment: We'll test if the environment is performing correctly or not.
  • Train Agent: Then we train an agent using PPO (Proximal Policy Optimization). PPO is one of the state-of-the-art RL algorithms which can be used for a ton of RL tasks. For this project, we are using a variant of PPO called the RecurrentPPO which uses an LSTM policy network. This helps us to learn the time series nature of stock data.
  • Testing the Agent: Finally, we test the agent to see if it learned the policy for making good trades.

Data Preprocessing

Let's get into the first step. Before that, it is better to integrate your Colab with Your Google Drive account, which enables you to access the dataset anytime by just providing the path. You can also integrate these two platforms easily by using the following code:

from google.colab import drive
drive.mount('/content/drive')

The first and most important thing is to install the necessary packages needed for our project, install these,

!pip install sb3_contrib shimmy gym stable_baselines3

import everything to the notebook.

import numpy as np
import pandas as pd
import gym
from gym import spaces
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import DummyVecEnv
from sklearn.preprocessing import MinMaxScaler
from sb3_contrib import RecurrentPPO

That's all we need I guess, Now let's load the dataset using pandas,

dataset = pd.read_csv('/content/drive/MyDrive/Trado AI/Datasets/Reliance_dataset.csv')

As I have integrated Colab into my Drive account,  It is easy to access the dataset using its path. While there is an alternate option to upload the dataset to Colab directly, it is not recommended since it results in a portion of available storage, the dataset being lost when the session is closed, and the data taking a considerable amount of time to upload every time. Therefore, it is better to go with the Drive integration instead.

Alright! We have the dataset loaded, let's get into preprocessing,

If you look at the dataset, you can see the one-minute timestamp, but it is better to use a 5-minute timestamp since it reduces the complexity and preserves the patterns, sometimes it is better to use 1-minute data, but here we are using 5-minute dataset.  

So we need to convert the 1-minute time frame to a 5-minute time frame, here is how you do it,

dataset.reset_index(drop=True, inplace=True)
keep_mask = (dataset.index + 1) % 5 == 0 # Iterating every 5 step ahead

dataset = dataset[keep_mask]

Calculating Technical Indicators

This task involves calculating the technical indicators we have discussed such as MACD and RSI and adding them to the dataset. But why do we need to do this? You can also train the Agent without technical indicators, but it might take your model a considerable amount of time to learn some policies. Adding technical indicators can help the model to make more accurate decisions.

Calculating MACD (Moving Average Convergence Divergence)

MACD compares short and long-term average prices. When they get closer, it's a sign of possible change, and when they move apart, it shows another type of change. Typically beginner traders often say "If MACD crosses the Signal line above, buy, else, sell". Basically, MACD tells us if the price must go up or down. Here is the Python function for calculating MACD and adding them to the dataset,

# Define a function to calculate MACD and its signal line
def calculate_macd(dataset, short_window, long_window, signal_window):
    # Calculate Exponential Moving Average (EMA) for short and long windows
    dataset['EMA_short'] = dataset['Close'].ewm(span=short_window, adjust=False).mean()  # Calculate short EMA
    dataset['EMA_long'] = dataset['Close'].ewm(span=long_window, adjust=False).mean()    # Calculate long EMA
   
    # Calculate MACD by subtracting long EMA from short EMA
    dataset['MACD'] = dataset['EMA_short'] - dataset['EMA_long']  # Calculate MACD line
   
    # Calculate the signal line (another EMA) for the MACD
    dataset['Signal'] = dataset['MACD'].ewm(span=signal_window, adjust=False).mean()  # Calculate signal line
   
    # Remove the intermediate EMA values, keeping only MACD and Signal columns
    dataset.drop(['EMA_short', 'EMA_long'], axis=1, inplace=True)  # Remove temporary EMA columns

If you don't get this it's okay, nor do I the first time, but it is not important to understand each and every point, just understand that we are calculating the MACD of the prices in a specific period and adding them as a column to the dataset.

Calculating RSI (Relative Strength Index)

RSI is another technical indicator that tells if the stock is overbought or oversold. If you get it, when the stock is overbought, there is a chance that the stock must be sold in the future and vice versa. This can be extremely useful for our Agent. Here is the function to calculate RSI,.

# Define a function to calculate RSI (Relative Strength Index)
def calculate_rsi(dataset, window):
    # Calculate the differences between consecutive closing prices
    diff = dataset['Close'].diff()  # Calculate price differences
   
    # Separate gains (positive differences) and losses (negative differences)
    gain = diff.where(diff > 0, 0)  # Get positive differences as gains
    loss = -diff.where(diff < 0, 0)  # Get negative differences as losses
   
    # Calculate average gains and losses using a rolling window
    avg_gain = gain.rolling(window=window).mean()  # Calculate average gains
    avg_loss = loss.rolling(window=window).mean()  # Calculate average losses
   
    # Calculate the Relative Strength (RS) by dividing average gains by average losses
    rs = avg_gain / avg_loss  # Calculate Relative Strength
   
    # Calculate RSI by applying the RSI formula
    rsi = 100 - (100 / (1 + rs))  # Calculate RSI values
   
    # Add the calculated RSI values as a new column in the dataset
    dataset['RSI'] = rsi  # Add RSI values to the dataset

Alright! That's all we need for technical indications. You can also explore more technical indicators and add them to the dataset. But remember, adding too much can potentially confuse the Agent. I chose two of the best-performing indicators.

Let's do a small thing, which is to rename the columns, it is not necessary, but It always confuses me if the dataset columns start with small letters or capital letters.

dataset.rename(columns={"close":"Close", "open":"Open", "high":"High", "low":"Low"}, inplace=True)

Logarithmic Transformation of Dataset

Neural Networks love small values rather than large values as input since small values are easier to compute, so to achieve this we need to shrink the values of the dataset to a certain range, it can be done using logarithms.

cleaned_df = dataset.copy() # Copy the dataset

eps=0.001

# Apply Logarithmic Transformation
cleaned_df['Open'] = np.log(cleaned_df.pop('Open')+eps)
cleaned_df['High'] = np.log(cleaned_df.pop('High')+eps)
cleaned_df['Low'] = np.log(cleaned_df.pop('Low')+eps)
cleaned_df['Close'] = np.log(cleaned_df.pop('Close')+eps)

Here I created a new variable called "cleaned_df" and assigned a copy of the dataset so that we can differentiate between previous uncleaned data and cleaned data. After that, we applied "np.log" with a small "eps" value to avoid mathematical errors when the value became too close to zero.

After doing that, the dataset will look something like this,


It's time to apply the technical indicators to the dataset,

calculate_macd(cleaned_df, 12, 26, 9)
calculate_rsi(cleaned_df, 14)
cleaned_df['MACD_Signal_diff'] = cleaned_df['MACD'] - cleaned_df['Signal']

One more column is added to the dataset called the "MACD_Signal_diff" which is the difference between MACD and Signal lines. This value became negative when the MACD cross the signal line below and positive when the MACD crosses the signal above. This can be incredibly helpful for our agent to make good trades.

Still, the dataset has some NaN (Not a Number) values, let's remove them,

cleaned_df.dropna(inplace=True)

Alright! we have successfully preprocessed the dataset, Next we need to build the Environment for the agent to play with, so using the preprocessed dataset let's make a custom environment using Gym.

Custom Trading Environment Using Gym

To create a custom trading environment, we can simply inherit the Gym environment and modify it as per our needs. For the trading environment, we need to consider a few things that align with the actual stock market,
  • Available money for trade
  • Maximum shares that can be bought with the money
  • Transactional cost
  • Profit/loss
  • Based on these factors, let's design a custom trading environment,
Based on these factors, let's design a custom trading environment,

import gym
import numpy as np

INITIAL_BALANCE = np.log(5000 + eps) # Converting the log range.

class TradingEnv(gym.Env):
    def __init__(self, df, initial_balance=INITIAL_BALANCE, max_shares=1, transaction_cost_percentage=0.00000000000000001):
        super(TradingEnv, self).__init__()

        self.df = df # Dataset
        self.current_step = 0 # Variable for iterating through each step
        self.max_steps = len(df) - 1 # Total number of steps / lenght of the dataset

        self.initial_balance = initial_balance # Balance for the first time
        self.max_shares = max_shares # Number of shares can be bought with the price
        self.transaction_cost_percentage = transaction_cost_percentage # Cost for each transaction

        self.action_space = gym.spaces.Discrete(3)  # Actions that the agent can take. 3 actions: Buy (0), Sell (1), Hold (2)

        # Observation space where the model can observe the data
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(11,), dtype=np.float32)

        self.balance = self.initial_balance # Changing balance
        self.bought = False
        self.sold = False
        self.hold = False
        self.shares_held = 0
        self.stock_price = 0
        self.total_profit = 0

    def reset(self):

        """Reset the environment each time the agent completes iteration through the dataset"""

        self.current_step = 0
        self.balance = self.initial_balance
        self.shares_held = 0
        self.total_profit = 0
        self.stock_price = self.df['Close'].iloc[self.current_step]
        return self._get_observation()

    def step(self, action):

        """Method for taking action"""

        self.current_step += 1 # Iterating through the dataset features
        self.stock_price = self.df['Close'].iloc[self.current_step]
        reward = 0.0

        if action == 0:  # Buy
            self._buy_shares()

        elif action == 1:  # Sell
            self._sell_shares()

        else:
            self.hold = True
            reward = 0.001 # Giving a small positive reward got holding in essential case
            pass

        # Only calculate reward when the agent performs Buy and Sell action
        if (self.bought or self.sold) and self.hold == False:
            reward = self._calculate_reward()

        done = self.current_step >= self.max_steps

        observation = self._get_observation()
        return observation, reward, done, {}

    def _get_observation(self):

        """The whole observation space the agent can interact with.
        Includes prices and technical indicators"""

        observation = np.array([
            self.df['Open'].iloc[self.current_step],
            self.df['Low'].iloc[self.current_step],
            self.df['High'].iloc[self.current_step],
            self.df['Close'].iloc[self.current_step],
            self.df['MACD'].iloc[self.current_step],
            self.df['Signal'].iloc[self.current_step],
            self.df['MACD_Signal_diff'].iloc[self.current_step],
            self.df['RSI'].iloc[self.current_step],
            self.balance,
            self.shares_held,
            self.stock_price,
        ], dtype=np.float32)

        return observation

    def _buy_shares(self):

        """Environment conditions for a buy action"""

        try:

            max_shares_affordable = int(self.balance / (self.stock_price * (1 + self.transaction_cost_percentage)))
            shares_to_buy = np.random.randint(1, max_shares_affordable + 1)  # Choose a random number of shares to buy
            transaction_cost = shares_to_buy * self.stock_price * self.transaction_cost_percentage
            self.balance -= (shares_to_buy * self.stock_price) + transaction_cost
            self.shares_held += shares_to_buy
            self.bought = True
            self.sold = False
            self.hold = False
        except:
            self.bought = False
            pass

    def _sell_shares(self):

        """Environment conditions for a sell action"""

        if self.shares_held > 0:
            shares_to_sell = np.random.randint(1, self.shares_held + 1)  # Choose a random number of shares to sell
            self.balance += (shares_to_sell * self.stock_price) * (1 - self.transaction_cost_percentage)
            self.shares_held -= shares_to_sell
            self.sold = True
            self.bought = False
            self.hold = False
        else:
            self.sold = False
            pass

    def _calculate_reward(self):

        """Reward calculation based on the agent's action"""

        current_balance = self.balance + (self.shares_held * self.stock_price)
        profit = current_balance - self.initial_balance # Profit calculation based on the current balance and initial balance
        reward = (profit - self.total_profit) * 200 # Reward scaled in terms or 200
        self.total_profit = profit # updating profit
        return reward


    def render(self):

        """Information rendering to the screen"""
       
        print("Stock Price:", np.exp(self.stock_price)-eps)
        print("Account Balance:", np.exp(self.balance)-eps)
        print("Number of Shares:", self.shares_held)
        print("Profit:", np.exp(self.total_profit)-eps)
        print("--------------------------")

You can understand the purpose of each line of code by reading the comments. However let's explain the overall working of the environment,

In our environment, the agent can observe 11 values including the OHLC prices, technical indicators, stock price, shares, and account balance.
  1. Initially, the agent takes a random action (0 for buy, 1 for sell, 2 for hold) since it doesn't know what to do. Based on the action taken, the agent is gifted with a reward that can be positive or negative which is determined by the change in the profit. If the profit is negative, ie, loss, the agent gets a negative reward, if the profit is positive, the agent gets a positive reward.
  2. The reward is only calculated based on the sell and buy actions.
  3. A small reward is given for holding in necessary conditions.
  4. During each action taken by the agent, the account balance, shares, and holdings are changed except for the hold trade.
  5. In each iteration, the agent receives the 5-minute prices from the dataset by interacting with the environment.
You can modify the environment according to your needs. The above code simulates the basic structure of the stock market including some portfolio and price changes after making trades.

If you want to learn more about Reinforcement Learning environments you can check Gym official documentation.

Can we test the environment if it is working properly or not? Yes, here is how you can do it,

env = TradingEnv(cleaned_df) # Initializing the environment
observation = env.reset() # Reset

# Testing with some actions

# Reset the environment
done = False

def test_env(action_):
    action = action_  # Buy action
    next_observation, reward, done, info = env.step(action)
    print("Reward:",reward)
    print("Action:",action)
    env.render()
    observation = next_observation
   
test_env(0)
test_env(1)
.....


---------

Reward: 0.0
Action: 0
Stock Price: 538.5999999999999
Account Balance: 9.282311765109982
Number of Shares: 1
Profit: 0.999
--------------------------
Reward: -0.16723934541928998
Action: 1
Stock Price: 537.7
Account Balance: 4991.645019411402
Number of Shares: 0
Profit: 0.9973290042164796
--------------------------

Nice, seems like working. You can test more actions and check if everything works properly.

Training the RecurrentPPO Agent

It's training time! We'll be using the RecurrentPPO algorithm with the LSTM policy network to train our agent on a customized trading environment. The reason we chose to use LSTMs is because they are excellent at recognizing patterns in sequential time series data. LSTMs have a memory cell that stores relevant information from the past, which is especially helpful for predicting trends over time.

If an LSTM with PPO encounters a high reward case, the PPO instructs the LSTM to remember that particular observation. As the algorithm encounters more observations with positive rewards, it learns the optimal policy for increasing the rewards. In our scenario, if the agent makes a profitable trade resulting in a larger reward, the model learns to seek more rewards leading to an increase in profits.

The training is really easy to do, not for the agent but for us, here is how we do it,

model = RecurrentPPO('MlpLstmPolicy', env, verbose=1, ent_coef=0.70, device='cuda')
model.learn(1000000)

Here, I used the RecurrentPPO algorithm from "sb3_contrib". the ent_coef is a variable that can help to adjust the exploration/exploitation. Exploration is the intention of the model to explore the environment by taking some random actions and seeing how it does. Meanwhile, exploitation is when the agent figures out some strategy that allows it to stick to a specific case without taking random actions. For typically Reinforcement Learning, we usually prefer a balance between these two.

Once you execute the code cell, the agent will begin learning. I trained it for approximately one million time steps and got a pretty good result.

------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 7.41e+04     |
|    ep_rew_mean          | 49           |
| time/                   |              |
|    fps                  | 271          |
|    iterations           | 7813         |
|    time_elapsed         | 3676         |
|    total_timesteps      | 1000064      |
| train/                  |              |
|    approx_kl            | 9.518117e-06 |
|    clip_fraction        | 0            |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.1         |
|    explained_variance   | -0.2         |
|    learning_rate        | 0.0003       |
|    loss                 | -0.624       |
|    n_updates            | 78120        |
|    policy_gradient_loss | 4.98e-06     |
|    value_loss           | 0.0854       |
------------------------------------------

Testing the Agent

Let's test the agent, for that, we need to reset the environment and let the trained model makes predictions,

env = TradingEnv(cleaned_df)
observation = env.reset()
observation_arr = []
action_arr = []

for i in range(50): # 50 * 5 minutes
    action, _ = model.predict(observation)
    obs, rewards, done, _ = env.step(action)
    observation_arr.append(obs)
    action_arr.append(action)
    print({"Rewards": rewards, "action": action})

After rendering the output, here is what I got,

Stock Price: 320.8499999999999
Account Balance: 5380.210045281541
Number of Shares: 0
Profit: 1.0750419938479097
--------------------------

The total profit after investing 5000 is around 380, which is great but there is more room for improvement,

Here is how you can plot the predictions made by the model,

# Combine observations into a single array
combined_observations = np.array(observation_arr)

# Create figure
plt.figure(figsize=(12, 6))

# Plot price movements (observation index 0)
plt.plot(combined_observations[:, 0], linewidth=2) #label='Price Movements', linewidth=2)

# Plot action dots
for t, action in enumerate(action_arr):
   
    if action == 0:  # Buy: Colored: Green
        plt.scatter(t, combined_observations[t, 0], color='green', s=100)
    elif action == 1:  # Sell: Colored: Red
        plt.scatter(t, combined_observations[t, 0], color='red', s=100)
    elif action == 2:  # Hold: Colored: Blue
        plt.scatter(t, combined_observations[t, 0], color='blue', s=100)

plt.xlabel('Time step')
plt.ylabel('Price')
plt.title('Price Movements with Actions')
plt.legend()
plt.grid()

plt.show()

To save your trained model, just do this,



model.save('/content/drive/MyDrive/Trado AI/Models/TradoV3')

If you want the pre-trained model and dataset, visit this drive link, The TradoV3 is trained on a few more datasets, so when using the pre-trained model, you'll see some difference in the performance of v2 and v3.