Load Apache Access Logs from AWS S3 bucket to AWS RDS-MYSql instance using PYSpark and AWS Glue Job | ETL Pipeline.

13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-" 
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s*" (\d{3}) (\S+)'
ETL Pipeline Architecture
import boto3
import pandas as pd
import sys
import re
from pyspark.sql import Row
from pyspark.sql import SparkSession
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
def parseApacheLogLine(logline):match = re.search(regex, logline)
if match is None:
return (Row(
host = '-',
client_identd = '-',
user_id = '-',
date_time = '-',
method = '-',
endpoint = '-',
protocol = '-',
response_code = -1,
content_size = 0
))
size_field = match.group(9)
if size_field == '-':
size = int(0)
else:
size = int(match.group(9))
return (Row(
host = match.group(1),
client_identd = match.group(2),
user_id = match.group(3),
date_time = match.group(4),
method = match.group(5),
endpoint = match.group(6),
protocol = match.group(7),
response_code = int(match.group(8)),
content_size = size
))
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
client = boto3.client('s3')
path= 'YOUR BUCKET PATH'

Bucket Path Format :
s3://bucket_name/file_name

parsed_logs = (sc
.textFile(path)
.map(parseApacheLogLine)
.cache())
logDataset = spark.createDataFrame(parsed_logs)
glue_df=DynamicFrame.fromDF(logDataset, glueContext, "dynaFrame")

Now its time to load our data to our RDS Instance, but before that there are some prerequisites :

1. Make sure your RDS Instance in publicly accessable.

2. Make a Glue Connection to your RDS Instance.

3. Define a Table in your RDS Database for loading the data.

output = glueContext.write_dynamic_frame.from_jdbc_conf
(frame = "<Your Dynamic Frame>",
catalog_connection = "<Your RDS Connection Name>", connection_options =
{"database" : "<Your Database Name >",
"dbtable" : "<Your Table Name>"})
spark.stop()
job.commit()

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store