Sepal.Length | Sepal.Width | Petal.Length | Petal.Width | Species |
---|---|---|---|---|
5.1 | 3.5 | 1.4 | 0.2 | setosa |
4.9 | 3 | 1.4 | 0.2 | setosa |
4.7 | 3.2. | 1.3 | 0.2 | setosa |
4.6 | 3.1 | 1.5 | 0.2 | setosa |
5.0 | 3.6 | x23 | 0.2 | setosa |
5.4 | 3.9 | 1.7 | 0.4 | setosa |
Building data pipelines in Go
When building data pipelines in the ETL (Extract, Transform, Load) format, the goal is to extract data from a source, transform it into a usable format, and load it into a destination for analysis or storage. Golang, has gained popularity in data engineering due to several key advantages: its simplicity, a rich standard library, and its minimal reliance on external dependencies. Go’s syntax is straightforward and easy to read, making it easier for teams to maintain and collaborate on code. Additionally, Go’s garbage collection and efficient performance make it a solid choice for data-intensive applications.
In this post, we will walk through an example of loading the Iris dataset, transforming it to a “long format,” and handling data errors gracefully along the way.
The main.go
file will serve as the entry point for our application. Here, we’ll define each function in separate files and organize the pipeline. To standardize the handling of missing values in this pipeline, we’ll define a constant NA
. The high-level workflow includes reading the data, pivoting it to long format, and then printing the results along with any errors encountered during processing. Let’s go through each function in detail.
package main
import (
"fmt"
)
const (
= "9999"
NA )
func main() {
:= ReadCsv("iris.csv")
data , errLog := PivotLong(data)
res
for i := 0; i < len(res); i++ {
.Print(res[i].Value, "\t", res[i].Item, "\t", res[i].Species, "\n")
fmt}
for i := 0; i < len(errLog); i++ {
.Println(
fmt[i].Time.Format("2006-01-02 15:04:05"),
errLog[i].Message)
errLog}
}
Reading in data
The ReadCsv
function reads in the data file. Using Go’s built-in csv
package, we define a reader with the correct delimiter and allocate a slice of slices for the data. Our function validates the data by ensuring that it’s not empty and checks the format, making sure that the first row (the header) is the longest row in the file. If rows have fewer cells than expected (missing trailing delimiters), we fill these missing cells with the NA
constant. Similarly, if a cell is empty but contains only whitespace, we trim it and replace it with NA
. Finally, we populate the data slice row by row.
package main
import (
"encoding/csv"
"os"
"strconv"
"strings"
)
func ReadCsv(path string) [][]string {
, err := os.Open(path)
fileif err != nil {
("Error opening file")
ErrExit}
defer file.Close()
:= csv.NewReader(file)
reader .Comma = ','
reader
:= make([][]string, 0)
data
var headerLen int
var rowCounter int
for {
++
rowCounter, _ := reader.Read()
rowif row == nil {
if rowCounter < 3 {
("Empty csv file")
ErrExit}
break
}
if len(row) > headerLen {
= len(row)
headerLen if rowCounter > 1 {
("CSV Misformation in row" + strconv.Itoa(rowCounter))
ErrExit}
}
for i := range row {
if row[i] = strings.TrimSpace(row[i]); row[i] == "" {
[i] = NA
row}
}
:= headerLen - len(row)
lenDiff for i := 0; i < lenDiff; i++ {
= append(row, NA)
row }
= append(data, row)
data }
return data
}
Custom Error handling
Our data reading function includes checks to ensure the file opens correctly and that the CSV format is as expected. Go treats errors as values, making it easy to create custom error handling functions to manage errors based on context. In this pipeline, we define two error functions: one logs errors and continues processing, while the other logs the error and exits the program. The exit function is useful for critical issues, such as a malformed file, that should prevent further execution. By contrast, the logging-only function is used in non-critical situations (like the pivot function) where we can skip over problematic rows.
Here, we define a custom error structure containing a timestamp and message, which could be useful for logging errors in a database if the pipeline is deployed in a production environment.
package main
import (
"fmt"
"os"
"time"
)
type errors struct {
.Time
Time timestring
Message }
func ErrLog(errLog []errors, message string) []errors {
= append(errLog, errors{Time: time.Now(), Message: message})
errLog return errLog
}
func ErrExit(message string) {
defer os.Exit(1)
var errlog []errors
= append(errlog, errors{Time: time.Now(), Message: message})
errlog
for i := 0; i < len(errlog); i++ {
.Println(
fmt[i].Time.Format("2006-01-02 15:04:05"),
errlog[i].Message)
errlog}
}
Pivoting the data
As Go is statically typed, we define a struct to hold the pivoted data. The result struct contains fields for the measurement Value
, the Item
name, and the Species
of our flower. The PivotLong
function transforms the data into long format, and any errors encountered during parsing are logged. These error logs, along with the transformed data, are printed in the main function.
package main
import (
"strconv"
)
type result struct {
float64
Value string
Item string
Species }
func PivotLong(data [][]string) ([]result, []errors) {
var res []result
var errLog []errors
var errMess string
for row := 1; row < len(data); row++ {
:= data[row][4]
species if species == NA {
= "Species is NA in row: " + strconv.Itoa(row)
errMess = ErrLog(errLog, errMess)
errLog continue
}
for col := 0; col < 4; col++ {
:= data[0][col]
item := data[row][col]
answerStr if answerStr == NA {
= "NA Value in " + string(data[0][col]) + " row: " + strconv.Itoa(row)
errMess = ErrLog(errLog, errMess)
errLog continue
}
, err := strconv.ParseFloat(answerStr, 64)
valueif err != nil {
= "Error parsing float of " + string(data[0][col]) + " in row: " + strconv.Itoa(row) + " value: " + string(answerStr)
errMess = ErrLog(errLog, errMess)
errLog continue
}
= append(res, result{
res : value,
Value: item,
Item: species,
Species})
}
}
return res, errLog
}
Creating data pipelines in Go is more hands-on than in some higher-level languages, which gives you greater control over each step. This approach allows you to handle data in a precise manner, especially useful for catching and managing unexpected issues. Despite the lower-level nature of Go, the code remains readable and maintainable, making it suitable even for complex pipelines.
5.1 Sepal.Length setosa
3.5 Sepal.Width setosa
1.4 Petal.Length setosa
0.2 Petal.Width setosa
4.9 Sepal.Length setosa
3 Sepal.Width setosa
1.4 Petal.Length setosa
0.2 Petal.Width setosa
4.7 Sepal.Length setosa
1.3 Petal.Length setosa
0.2 Petal.Width setosa
4.6 Sepal.Length setosa
3.1 Sepal.Width setosa
1.5 Petal.Length setosa
0.2 Petal.Width setosa
5 Sepal.Length setosa
3.6 Sepal.Width setosa
0.2 Petal.Width setosa
5.4 Sepal.Length setosa
3.9 Sepal.Width setosa
1.7 Petal.Length setosa
0.4 Petal.Width setosa
5 Sepal.Length setosa
3.4 Sepal.Width setosa
1.5 Petal.Length setosa
4.4 Sepal.Length setosa
2.9 Sepal.Width setosa
1.4 Petal.Length setosa
0.2 Petal.Width setosa
2024-11-01 14:19:30 Error parsing float of Sepal.Width in row: 3 value: 3.2.
2024-11-01 14:19:30 Error parsing float of Petal.Length in row: 5 value: x23
2024-11-01 14:19:30 Species is NA in row: 7
2024-11-01 14:19:30 NA Value in Petal.Width row: 8