From Aither Labs

Part 3: Build Your Own Crypto Trading Platform using AWS Lambda – Adding the Trading BUY Evaluation Engine

In parts 1 and 2, we build the infrastructure for monitoring real-time prices and storing them in the database. In part 3, we will add a Trading Engine that will analyze each datapoint and then decide whether or not to execute a trade. For now, we will just print output to the console and return model_id values to the calling function.

To accomplish this, we need to do the following:

1. Define the “pools” of money we will use to trade

2. Define the models, or criteria, that we will use to make trades.

3. Create a stored procedure to evaluate inbound prices and models

4. Modify our getPrices function to stick incoming prices into the evaluate queue

Defining Trading Pools

The first thing we need to do is to be able to define how much money we want to use to trade each symbol. We will call this a “pool”.

Each pool will have its own unique ID, a specific starting dollar amount, and assigned exchange/symbol combination. I have also added a backtesting flag so that we can create models for backtesting only. Note that I intentionally did not make pool_id auto increment as they should be relatively static and this also makes it easier to port values between my development environment and production. Here is the SQL for creating the trading_pools table:

use aither_crypto;

drop table if exists trading_pools;

create table trading_pools
(
    pool_id		bigint unsigned not null unique,
    exchange		varchar(20),
    symbol		varchar(10),
    total_amount	decimal(10,2),
    is_backtest		tinyint(1) default 0,
    primary key (pool_id)
);

Now let’s insert our first trading pool to use – $10,000 to trade Bitcoin! I am using 0 as an arbitrary pool_id that I chose:

insert into trading_pools (pool_id, exchange, symbol, total_amount, is_backtest)
values (0, 'coinbase', 'BTC-USD', 10000, 0);

Defining Trading Models

The second thing we need to do is define a structure that will let us define different trading models we want to use to spend our trading pools (for example, “buy when there has been a 2% loss in BTC during the last 24 hours”). We will call this a trading “model”. Each model will have a stored procedure associated with it the will return either a BUY recommendation or no action.

We are going to create a simple table structure to hold simple models that trades purely on percent gain/loss. Feel free to modify this structure to make it work for your needs.

Our trading_models table is defined as:

use aither_crypto;

drop table if exists trading_models;

create table trading_models
(
    model_id			varchar(50) not null unique,
    exchange			varchar(20),
    symbol			varchar(10),
    proc_name_buy		varchar(255),
    target_level_buy		decimal(10,2)	default null,
    enabled			tinyint		default 1,
    primary key (model_id)
);

Again, the model_id is an arbitrary, unique identifier that you define. proc_name is the stored procedure that will contain the guts of the trading model. Each stored procedure will have a standard definition that takes in a price point, model_id, and pool_id to decide if a trade should be made. Finally, target_level_buy, take_profit_percentage, and stop_loss_percent define how our model will enter and exit the market. (If we set target_level_buy to null, then a buy will be initiated at the current market price).

Let’s go ahead and create a model in our table:

insert into trading_models (model_id, exchange, symbol, proc_name_buy, target_level_buy)
values ('CB_BTC_AITHERCRYPTO', 'coinbase', 'BTC-USD', 'sp_evaluate_buy_CB_BTC_AITHERCRYPTO', 27525.00);

Notice that as part of this insert, I added the stored procedure name sp_evaluate_buy_CB_BTC_AITHERCRYPTO. We will build this stored procedure shortly.

Relating Models to Pools

Now that we have created our models and pools, we need to assign trading models to each pool. The data structure allows you to assign multiple models to a single pool, but for our purposes we will keep a 1:1 ratio for models to pools (again, feel free to add complexity on your own).

We will use a simple join table to assign trading models to a pool:

drop table if exists trading_pool_models;

create table trading_pool_models
(
    pool_id				bigint,
    model_id				varchar(50),
    primary key (pool_id, model_id)
);

And, inserting a relationship between the pool and model we created above:

insert into trading_pool_models(pool_id, model_id)
values(0, 'CB_BTC_AITHERCRYPTO');

The above relationship means that the trading model CB_BTC_AITHERCRYPTO will use pool 0 for executing trades.

Creating the Stored Procedures for Model Evaluation

As we see in the trading_models table above, each model will have an evaluation stored procedure defined to determine if a BUY should be executed. The stored proc will have a standard input definition of date time and model_id. From that information, the proc will look up details of the model and pool to decide if a trade should be made.

The buy stored procedure we need to create for our lone model is sp_evaluate_buy_CB_BTC_AITHERCRYPTO (the one we added in the INSERT statement above).

This procedure will check the conditions we have set in the models table against the current price (see blog posts 1 and 2) and make a recommendation on whether to buy or not. In this case, we just want to check if the last price is lower than the target level at which the model says to buy.

use aither_crypto;

drop procedure if exists sp_evaluate_buy_CB_BTC_AITHERCRYPTO;

delimiter $$
create procedure sp_evaluate_buy_CB_BTC_AITHERCRYPTO
(
	locDate				datetime,
    locModelID			varchar(50)
)
this_proc:begin

    declare dLastPrice				decimal(10,2);
    declare dTargetLevelBuy			decimal(10,2);
    declare sExchange 				varchar(20);
    declare sSymbol 				varchar(10);
    
    -- Get model details
    select 
	m.exchange, m.symbol, m.target_level_buy
        into @sExchange, @sSymbol, @dTargetLevelBuy
	from trading_models m
    where 
		m.model_id=locModelID;
    
    -- Get the last price before the passed in date
    select
	price into @dLastPrice
    from
	price_history
    where
	exchange = @sExchange
	and symbol = @sSymbol
	and created <= locDate
	and created >= date_add(locDate, interval -(1) hour)
    order by created desc
	limit 1;
    
    -- If the last/latest price is less than the model target level buy, then
    -- return a BUY recommendation. 
    select 
	'BUY' as 'action',
	@dLastPrice as 'last_price',
        @dTargetLevelBuy as 'target_price'
    where
	@dLastPrice<@dTargetLevelBuy;
end$$

delimiter ;

We can test this procedure by running a quick test against known data. The image below shows that the last price of BTC-USD was 27393.56.

Our stored procedure checks to see if the price is below 27525.00 (from the target_buy_level of the model) and will return a buy recommendation if the condition is met. We can test this by using a date after the last price:

call sp_evaluate_buy_CB_BTC_AITHERCRYPTO("2023-05-15 18:32:30", "CB_BTC_AITHERCRYPTO");

And the stored procedure returns:

Adding a Lambda Function to Evaluate BUYs

The final step is to connect our chain of Lambda functions to run BUY evaluations for each model and pool.

In Part 1 and Part 2 of this series, we built an engine that pulled and stored prices. In this section we are going to add a second Lambda function that is run after every price pull to determine if a purchase should be made. We will create a generic function called evaluateModels for this purpose. In a later post, we will also use this function to evaluate SELL actions.

NOTE: I am not going to cover the details of creating a Lambda function as we already did this in Part 1.

To send information in to our new Lambda function, we need to define the event message format we are going to pass from getPrices. We will use a simple format with the token symbol, the price, and the timestamp of the price:

{
  "Symbol": "BTC-USD",
  "Price": "27393.56",
  "Timestamp": "2023-05-15 18:32:30"
}

In the same AWS region that the getPrices Lambda function was created in Part 1, create a new Python 3.8 function and call it evaluateModels. Make sure to use the Aither-Crypto-Lambda-Role we created in Part 1.

After the function is created, add the pymysql module the same way that we did in Part 2. We will use this to pull and evaluate our models from the DB.

Here is the full code for the Lambda function:

import boto3
from datetime import datetime
from datetime import timedelta
import time
import json
import pymysql
import urllib.request


#NOTE:  This is a helper function that will parse and return JSON
#from an event message body.  I have found this addresses various 
#scenarios where messages have slightly different formats between
#production SQS and using the Test functionality.
def parse_message_body_json(message):
    
    message_body_json=None
    
    try:
        #Adjust our message body so that it parses correctly
        message_body=message["body"]
        
        if isinstance(message_body, list)==True:
            list_message_body=message['body']
            
            message_body_json = convert_list_to_dict(list_message_body)[0]
            
        elif isinstance(message_body, dict)==True:
            message_body_json=message_body
        
        elif isinstance(message_body, str):
            message_body=message_body.replace("'", "\"")
            message_body=message_body.strip('"')
            
            if message_body.index('[')!=0:
                if message_body.rindex(']')!=len(message_body)-1:
                    message_body="[%s]" % message_body
            
            #print("Debug 1: %s" % str(message_body))
            
            message_body=json.loads(str(message_body))
            
            message_body_json=message_body[0]
            
            
        #print(message_body)
        
    except Exception as error:
        print(error)
        print("Unable to parse JSON.")
        
        #pass
    
    return message_body_json
    
        

def lambda_handler(event, context):
    
    #Store recommendations here to return from the function.
    arr_buy_recommendations=[]
        
    print("Connecting to DB...")
    #Setup our database connection
    db_host="<Your DB hostname here>"
    db_username="crypto"		#OR whatever DB username you created
    db_password="<Your DB password here>"
    conn = pymysql.connect(host=db_host, user=db_username, password=db_password, \
        charset='utf8', db='aither_crypto', cursorclass=pymysql.cursors.DictCursor)
    cur = conn.cursor()
    
    
    print("Parsing symbol/price information from the event object...")
    message_body_json=parse_message_body_json(event)
    print(message_body_json)
    
    print("Getting pool and model combinations for the symbol...")
    sql = """select 
            	p.pool_id,
            	m.model_id,
            	m.exchange,
            	m.symbol,
            	p.total_amount as 'total_pool_amount',
            	m.proc_name_buy
            from
            	trading_models m,
            	trading_pools p,
            	trading_pool_models pm
            where
            	m.enabled = 1
                and p.symbol='%s'
                and p.exchange = m.exchange
        	and p.symbol = m.symbol
        	and pm.pool_id=p.pool_id
        	and pm.model_id=m.model_id;""" % message_body_json["Symbol"]
    #print(sql)
    cur.execute(sql)
    
    result_models = cur.fetchall()

    
    #Evaluate BUY opportunities for each pool/model configuration
    print("Evaluating BUY opportunities...")
    for row in result_models:
        try:
            timestamp=message_body_json["Timestamp"]
            
            print("Evaluating symbol %s at %s against model %s..." % (message_body_json["Symbol"], timestamp, row["model_id"]))
            
            sql = "call %s('%s', '%s');" % (row["proc_name_buy"], timestamp, row["model_id"])
            print(sql)
            
            cur.execute(sql)
        
            eval_result = cur.fetchall()
            
            #If a row is returned from the proc, then that means we want to BUY.
            for eval_row in eval_result:
                print("TODO:  Placing a BUY order for model=%s..." % (row["model_id"]))
                
                arr_buy_recommendations.append({"model_id":row["model_id"]})
                
                #TODO:  Execute Trade Here
                
        except Exception as error:
            print(error)
            continue

    
    #Clean up
    cur.close()
    conn.close()
    

    #Return result 
    return {
        'data': message_body_json,
        'buy_recommendations': arr_buy_recommendations
    }

This code will return the below JSON in the case it detects a BUY:

{
  "data": {
    "Symbol": "BTC-USD",
    "Price": "27393.56",
    "Timestamp": "2023-05-15 18:32:30"
  },
  "buy_recommendations": [
    {
      "model_id": "CB_BTC_AITHERCRYPTO"
    }
  ]
}

The first thing the evaluateModels function does is generate the SQL to pull all of the model_ids we have in the database that match the symbol that we want to check – in this case, “BTC-USD”. We also pull the proc_name_buy column, so we know what stored procedure to call to evaluate the current price data point.

NOTE: We also pull the total_amount we have allocated to the model, but we do not yet calculate how much of the pool is available. We will add this later when we start making and tracking actual purchases and sales.

    print("Getting pool and model combinations for the symbol...")
    sql = """select 
            	p.pool_id,
            	m.model_id,
            	m.exchange,
            	m.symbol,
            	p.total_amount as 'total_pool_amount',
            	m.proc_name_buy
            from
            	trading_models m,
            	trading_pools p,
            	trading_pool_models pm
            where
            	m.enabled = 1
                and p.symbol='%s'
                and p.exchange = m.exchange
        	and p.symbol = m.symbol
        	and pm.pool_id=p.pool_id
        	and pm.model_id=m.model_id;""" % message_body_json["Symbol"]

After we retrieve the defined models, we use the proc_name_buy to generate SQL to call the evaluation proc for each model – I have highlighted the line in red. We have also defined our model evaluation procs to take a timestamp and model_id and pass that information back into the evaluation proc. The evaluation proc will look up the price details based on the symbol and model details based on the model_id.

This may be a little confusing since we pull model information and pass it into a proc which then pulls model information – but the other option is to pull them in the first query above from the trading_models table and then pass all the details around. I find it more flexible and less prone to error to pass around only the basic info needed. For instance, you could modify the trading_models table to have additional parameter columns and you would only need to modify the guts of the stored proc, instead of pulling more variables into Python and modifying the evaluation proc parameters.

print("Evaluating BUY opportunities...")
    for row in result_models:
        try:
            timestamp=message_body_json["Timestamp"]
            
            print("Evaluating symbol %s at %s against model %s..." % (message_body_json["Symbol"], timestamp, row["model_id"]))
            
            sql = "call %s('%s', '%s');" % (row["proc_name_buy"], timestamp, row["model_id"])
            print(sql)
            
            cur.execute(sql)
        
            eval_result = cur.fetchall()

Lastly, we check the result from the evaluation stored procedure and get ready to place an actual BUY. Here we will just print to the console.

            #If a row is returned from the proc, then that means we want to BUY.
            for eval_row in eval_result:
                print("TODO:  Placing a BUY order for model=%s..." % (row["model_id"]))
                
                arr_buy_recommendations.append({"model_id":row["model_id"]})
                
                #TODO:  Execute Trade Here

Adding a Lambda Function to Evaluate BUYs

The last step is to add a synchronous call to our evaluateModels function directly from the getPrices function. The code below executes this call and waits for the function to return a response.

            #Store the result into JSON.  We will return this for now, but use it later.
            arr_single_coin=[]
            dict_single_coin={"Symbol": coin_pair, "Price": spot_details['data']['amount'], "Timestamp":now}
            arr_single_coin.append(dict_single_coin)
            
            lambda_client = boto3.client('lambda')
            evaluateModels_response = lambda_client.invoke(FunctionName="evaluateModels",
                                                  InvocationType='RequestResponse',
                                                  Payload=json.dumps(dict_single_coin))
            
            payload=evaluateModels_response['Payload']
            payload_json=json.loads(payload.read().decode("utf-8"))
            
            print(payload_json)
            
            json_return.append(arr_single_coin)

When you run the getPrices function, the print(payload_json) line will show you the result of the call to the evaluateModels function. In this case, it shows a buy recommendation for our model_id.

{
   "data":{
      "Symbol":"BTC-USD",
      "Price":"27393.56",
      "Timestamp":"2023-05-15 18:32:30"
   },
   "buy_recommendations":[
      {
         "model_id":"CB_BTC_AITHERCRYPTO"
      }
   ]
}

And that is it! Now, every time your getPrices function runs, you will also be checking all of your models to see if they recommend you buy.

You can find all the code for this in our GitHub repository.

In Part 4, we will add in trade logging and build out the functionality to evaluate positions for a SELL action.

Part 3: Build Your Own Crypto Trading Platform using AWS Lambda – Adding the Trading BUY Evaluation Engine