Danh mụcThẻBài viết

admin

I'm a Full-stack developer

Thẻ

Linked List
Data Structure
Chat GPT
Design Pattern
Microservices
API
AWS CDK
ReactJS
AWS Lightsail
Flutter Mobile
Writing a Data Transformation Pipeline Using Go
Ngày đăng: 20/03/2024

In this article, I will show how to implement data processing using the Go programming language with a simple tutorial.


Prerequisites

You should read/complete the EXTRACT DATA USING WEB SCRAPING WITH PYTHON article before implementing this article because this article is the next step of the previous article. 


Content

  1. What is the data processing?
  2. Why is Go programming language for data processing?
  3. Let’s get started!


What is the data processing?

  • Data transformation is the process of converting data from one format, such as a CSV file, Excel spreadsheet, etc..., into another.


  • Transformations typically involve converting a raw data source into a cleansed, validated, and ready-to-use format. Data transformation is crucial to data management processes that include data integration, data migration, data warehousing, and data preparation.


Why is Go programming language for data processing?

Document ref: Supercharging Data Processing with Go: A Comprehensive Guide


  • Performance: Go is known for its exceptional performance, thanks to its compiled nature and efficient garbage collection. This makes it well-suited for handling large volumes of data quickly and efficiently. It also benefits from low-level control, which allows developers to optimize data processing algorithms for their specific needs.


  • Concurrency: Concurrency is a first-class citizen in Go. Goroutines and channels make it easy to write concurrent and parallel programs, making Go an ideal choice for tasks that require handling multiple data streams or parallel processing.


  • Simplicity: Go's simplicity in syntax and language design makes it accessible to both experienced developers and those new to the language. This simplicity extends to data processing, allowing developers to focus on the task at hand rather than wrestling with complex language features.


Let’s get started!

  • Create a new directory with the name data-transformation
mkdir data-transformation


  • Then, move to the new folder.
cd data-transformation


  • Create a go.mod file with the go mod command
go mod init data-transformation


  • Then, go.mod file is created with content
module data-transformation

go 1.21.4


  • Next, create a main.go file:
package main

import (
  "agapifa-data-transformation/config"
  transformation_data_core "agapifa-data-transformation/core"
  aws_s3_utils "agapifa-data-transformation/utils"
  mysql_utils "agapifa-data-transformation/utils"
  "os"

  _ "github.com/go-sql-driver/mysql"
)

func main() {
  bucketName := os.Args[1]
  fileKey := os.Args[2]

  config.LoadConfig(".")

  db := mysql_utils.ConnectDB()

  aws_s3_utils.DownloadFromS3Bucket(bucketName, fileKey)

  transformation_data_core.Exec(bucketName+"/"+fileKey, db)
}


  • Create a config/config.go file:
package config

import (
  "github.com/rs/zerolog/log"

  "github.com/spf13/viper"
)

type Config struct {
  Environment           string `mapstructure:"ENVIRONMENT"`
  AWS_REGION            string `mapstructure:"AWS_REGION"`
  AWS_SECRET_ACCESS_KEY string `mapstructure:"AWS_SECRET_ACCESS_KEY"`
  AWS_ACCESS_KEY_ID     string `mapstructure:"AWS_ACCESS_KEY_ID"`
  MYSQL_HOST            string `mapstructure:"MYSQL_HOST"`
  MYSQL_PORT            string `mapstructure:"MYSQL_PORT"`
  MYSQL_DATABASE        string `mapstructure:"MYSQL_DATABASE"`
  MYSQL_USERNAME        string `mapstructure:"MYSQL_USERNAME"`
  MYSQL_PASSWORD        string `mapstructure:"MYSQL_PASSWORD"`
}

func LoadConfig(path string) (config Config, err error) {
  viper.AddConfigPath(path)
  viper.SetConfigName("app")
  viper.SetConfigType("env")

  viper.AutomaticEnv()

  err = viper.ReadInConfig()
  if err != nil {
    log.Fatal().Err(err).Msg("cannot load config")
  }

  err = viper.Unmarshal(&config)
  return
}


  • Create utils:
  • mysql.go file
package utils

import (
  "agapifa-data-transformation/config"
  "database/sql"
  "fmt"

  _ "github.com/go-sql-driver/mysql"
  "github.com/rs/zerolog/log"
)

func ConnectDB() *sql.DB {
  config, _ := config.LoadConfig(".")
  db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.MYSQL_USERNAME, config.MYSQL_PASSWORD, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DATABASE))
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }

  return db
}


  • aws_s3.go file
package utils

import (
  "agapifa-data-transformation/config"
  "fmt"
  "os"

  "github.com/rs/zerolog/log"

  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/service/s3"
  "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func DownloadFromS3Bucket(bucket string, filePath string) {
  config, _ := config.LoadConfig(".")

  if _, err := os.Stat(bucket); os.IsNotExist(err) {
    os.Mkdir(bucket, os.ModePerm)
  }

  file, err := os.Create(bucket + "/" + filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }
  defer file.Close()

  sess, _ := session.NewSession(&aws.Config{Region: aws.String(config.AWS_REGION)})
  downloader := s3manager.NewDownloader(sess)
  numBytes, err := downloader.Download(file,
    &s3.GetObjectInput{
      Bucket: aws.String(bucket),
      Key:    aws.String(filePath),
    })
  if err != nil {
    fmt.Println(err)
  }

  fmt.Println("Downloaded", file.Name(), numBytes, "bytes")
}


  • Next, create a core/transformation_data.go file
package transformation_data_core

import (
  "database/sql"
  "encoding/csv"
  "os"
  "strconv"
  "github.com/rs/zerolog/log"
)

type CsvRecord struct {
  Name   string
  Price int
  Description   string
  Image      string
}

func openFile(filePath string) *os.File {
  f, err := os.Open(filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Opening csv file has an error")
  }


  return f
}

func readFile(file *os.File) [][]string {
  csvReader := csv.NewReader(file)
  csvReader.Comma = ','
  data, err := csvReader.ReadAll()

  if err != nil {
    log.Fatal().Err(err).Msg("Reading csv file has an error")
  }

  return data
}

func getProductList(data [][]string) []CsvRecord {
  var productList []CsvRecord
  for i, line := range data {
    if i > 0 {
      var rec CsvRecord
      for j, field := range line {
        if j == 0 {
          rec.Name = field
        }

        if j == 1 {
          i, err := strconv.Atoi(field)
          if err != nil {
            rec.Price = 0
          } else {
            rec.Price = i
          }
        }

        if j == 2 {
          rec.Description = field
        }

        if j == 3 {
          rec.Image = field
        }
      }
      productList = append(productList, rec)
    }
  }

  return productList
}


func upsertProducts(db *sql.DB, productList []CsvRecord) {
  for _, p := range productList {
    var productId sql.NullString

    db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)

    if productId.String == "" {
      productQuery, err := db.Query(`
        INSERT IGNORE INTO product(name, description, price) 
        VALUES(?, ?, ?)
        `,
        p.Name,
        p.Description,
        p.Price,
      )

      if err != nil {
        log.Fatal().Err(err).Msg("Insert product has an error")
      }

      productQuery.Close()

      db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)
    }
  }
}

func Exec(file_path string, db *sql.DB) {
  file := openFile(file_path)
  data := readFile(file)

  productList := getProductList(data)

  upsertProducts(db, productList)

  defer file.Close()
  defer db.Close()
}


  • Add .env
ENVIRONMENT=development

# AWS
AWS_REGION=
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=

# MySQL
MYSQL_HOST=
MYSQL_PORT=
MYSQL_DATABASE=
MYSQL_USERNAME=
MYSQL_PASSWORD=


Execute script

go run main.go


Conclusion


In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.

Good luck to you, hope this post is of value to you!!!!

Đề xuất

Part 3: React Fragments
admin18/06/2023

Part 3: React Fragments
In this part, I will show you about good benefits when using fragments in React.
State management with redux in Flutter
admin14/06/2023

State management with redux in Flutter
In this article I will show an example to manage state in Flutter application using Redux.
Data structure: Doubly Linked List
admin07/04/2024

Data structure: Doubly Linked List
In this article, I would like to show you about Data structure - Doubly Linked List
Mới nhất

Data structure: Doubly Linked List
admin07/04/2024

Data structure: Doubly Linked List
In this article, I would like to show you about Data structure - Doubly Linked List
Part 3: Upgrade Latest Ghost Ver On AWS Lightsail
admin17/06/2023

Part 3: Upgrade Latest Ghost Ver On AWS Lightsail
You are a beginner with Ghost CMS, Bitanami, AWS Lightsail and don't know much about the documentation yet. So, in this article, I introduce step by step to upgrade Ghost CMS to the latest version.
TypeScript Design Pattern - Prototype
admin07/08/2023

TypeScript Design Pattern - Prototype
The prototype pattern is one of the Creational pattern groups. The responsibility is to create a new object through clone the existing object instead of using the new key. The new object is the same as the original object, and we can change its property does not impact the original object.
Đinh Thành Công Blog

My website, where I write blogs on a variety of topics and where I have some experiments with new technologies.

hotlinelinkedinskypezalofacebook
DMCA.com Protection Status
Góp ý
Họ & Tên
Số điện thoại
Email
Nội dung
Tải ứng dụng
hotline

copyright © 2023 - AGAPIFA

Privacy
Term
About