In this article, I will show how to implement data processing using the Go programming language with a simple tutorial.
You should read/complete the EXTRACT DATA USING WEB SCRAPING WITH PYTHON article before implementing this article because this article is the next step of the previous article.
Document ref: Supercharging Data Processing with Go: A Comprehensive Guide
mkdir data-transformation
cd data-transformation
go mod init data-transformation
module data-transformation go 1.21.4
package main import ( "agapifa-data-transformation/config" transformation_data_core "agapifa-data-transformation/core" aws_s3_utils "agapifa-data-transformation/utils" mysql_utils "agapifa-data-transformation/utils" "os" _ "github.com/go-sql-driver/mysql" ) func main() { bucketName := os.Args[1] fileKey := os.Args[2] config.LoadConfig(".") db := mysql_utils.ConnectDB() aws_s3_utils.DownloadFromS3Bucket(bucketName, fileKey) transformation_data_core.Exec(bucketName+"/"+fileKey, db) }
package config import ( "github.com/rs/zerolog/log" "github.com/spf13/viper" ) type Config struct { Environment string `mapstructure:"ENVIRONMENT"` AWS_REGION string `mapstructure:"AWS_REGION"` AWS_SECRET_ACCESS_KEY string `mapstructure:"AWS_SECRET_ACCESS_KEY"` AWS_ACCESS_KEY_ID string `mapstructure:"AWS_ACCESS_KEY_ID"` MYSQL_HOST string `mapstructure:"MYSQL_HOST"` MYSQL_PORT string `mapstructure:"MYSQL_PORT"` MYSQL_DATABASE string `mapstructure:"MYSQL_DATABASE"` MYSQL_USERNAME string `mapstructure:"MYSQL_USERNAME"` MYSQL_PASSWORD string `mapstructure:"MYSQL_PASSWORD"` } func LoadConfig(path string) (config Config, err error) { viper.AddConfigPath(path) viper.SetConfigName("app") viper.SetConfigType("env") viper.AutomaticEnv() err = viper.ReadInConfig() if err != nil { log.Fatal().Err(err).Msg("cannot load config") } err = viper.Unmarshal(&config) return }
package utils import ( "agapifa-data-transformation/config" "database/sql" "fmt" _ "github.com/go-sql-driver/mysql" "github.com/rs/zerolog/log" ) func ConnectDB() *sql.DB { config, _ := config.LoadConfig(".") db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.MYSQL_USERNAME, config.MYSQL_PASSWORD, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DATABASE)) if err != nil { log.Fatal().Err(err).Msg("Error when opening DB") } return db }
package utils import ( "agapifa-data-transformation/config" "fmt" "os" "github.com/rs/zerolog/log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func DownloadFromS3Bucket(bucket string, filePath string) { config, _ := config.LoadConfig(".") if _, err := os.Stat(bucket); os.IsNotExist(err) { os.Mkdir(bucket, os.ModePerm) } file, err := os.Create(bucket + "/" + filePath) if err != nil { log.Fatal().Err(err).Msg("Error when opening DB") } defer file.Close() sess, _ := session.NewSession(&aws.Config{Region: aws.String(config.AWS_REGION)}) downloader := s3manager.NewDownloader(sess) numBytes, err := downloader.Download(file, &s3.GetObjectInput{ Bucket: aws.String(bucket), Key: aws.String(filePath), }) if err != nil { fmt.Println(err) } fmt.Println("Downloaded", file.Name(), numBytes, "bytes") }
package transformation_data_core import ( "database/sql" "encoding/csv" "os" "strconv" "github.com/rs/zerolog/log" ) type CsvRecord struct { Name string Price int Description string Image string } func openFile(filePath string) *os.File { f, err := os.Open(filePath) if err != nil { log.Fatal().Err(err).Msg("Opening csv file has an error") } return f } func readFile(file *os.File) [][]string { csvReader := csv.NewReader(file) csvReader.Comma = ',' data, err := csvReader.ReadAll() if err != nil { log.Fatal().Err(err).Msg("Reading csv file has an error") } return data } func getProductList(data [][]string) []CsvRecord { var productList []CsvRecord for i, line := range data { if i > 0 { var rec CsvRecord for j, field := range line { if j == 0 { rec.Name = field } if j == 1 { i, err := strconv.Atoi(field) if err != nil { rec.Price = 0 } else { rec.Price = i } } if j == 2 { rec.Description = field } if j == 3 { rec.Image = field } } productList = append(productList, rec) } } return productList } func upsertProducts(db *sql.DB, productList []CsvRecord) { for _, p := range productList { var productId sql.NullString db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId) if productId.String == "" { productQuery, err := db.Query(` INSERT IGNORE INTO product(name, description, price) VALUES(?, ?, ?) `, p.Name, p.Description, p.Price, ) if err != nil { log.Fatal().Err(err).Msg("Insert product has an error") } productQuery.Close() db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId) } } } func Exec(file_path string, db *sql.DB) { file := openFile(file_path) data := readFile(file) productList := getProductList(data) upsertProducts(db, productList) defer file.Close() defer db.Close() }
ENVIRONMENT=development # AWS AWS_REGION= AWS_SECRET_ACCESS_KEY= AWS_ACCESS_KEY_ID= # MySQL MYSQL_HOST= MYSQL_PORT= MYSQL_DATABASE= MYSQL_USERNAME= MYSQL_PASSWORD=
go run main.go
In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.
Good luck to you, hope this post is of value to you!!!!