Load Apache Access Logs from AWS S3 bucket to AWS RDS-MYSql instance using PYSpark and AWS Glue Job | ETL Pipeline.

13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-" 
  1. IP Address
  2. Client ID
  3. User ID
  4. Date and Time of the request
  5. Request Line (Method, Path, Query String and Protocol)
  6. Status Code
  7. Content Size (Size of the content returned to the client)
  1. Referrer
  2. User Agent HTTP Header
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s*" (\d{3}) (\S+)'
ETL Pipeline Architecture
import boto3
import pandas as pd
import sys
import re
from pyspark.sql import Row
from pyspark.sql import SparkSession
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
def parseApacheLogLine(logline):match = re.search(regex, logline)
if match is None:
return (Row(
host = '-',
client_identd = '-',
user_id = '-',
date_time = '-',
method = '-',
endpoint = '-',
protocol = '-',
response_code = -1,
content_size = 0
))
size_field = match.group(9)
if size_field == '-':
size = int(0)
else:
size = int(match.group(9))
return (Row(
host = match.group(1),
client_identd = match.group(2),
user_id = match.group(3),
date_time = match.group(4),
method = match.group(5),
endpoint = match.group(6),
protocol = match.group(7),
response_code = int(match.group(8)),
content_size = size
))
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
client = boto3.client('s3')
path= 'YOUR BUCKET PATH'
parsed_logs = (sc
.textFile(path)
.map(parseApacheLogLine)
.cache())
logDataset = spark.createDataFrame(parsed_logs)
glue_df=DynamicFrame.fromDF(logDataset, glueContext, "dynaFrame")
output = glueContext.write_dynamic_frame.from_jdbc_conf
(frame = "<Your Dynamic Frame>",
catalog_connection = "<Your RDS Connection Name>", connection_options =
{"database" : "<Your Database Name >",
"dbtable" : "<Your Table Name>"})
spark.stop()
job.commit()

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Sarthak Dhawan

Sarthak Dhawan

Computer Engineer - Thapar Institute of Engineering and Technology