Building data pipelines in Go

Go
This post will cover essential first steps for building an ETL pipeline in Go, focusing on reading, transforming, and validating data with custom error handling for a robust, maintainable pipeline.
Published

November 1, 2024

When building data pipelines in the ETL (Extract, Transform, Load) format, the goal is to extract data from a source, transform it into a usable format, and load it into a destination for analysis or storage. Golang, has gained popularity in data engineering due to several key advantages: its simplicity, a rich standard library, and its minimal reliance on external dependencies. Go’s syntax is straightforward and easy to read, making it easier for teams to maintain and collaborate on code. Additionally, Go’s garbage collection and efficient performance make it a solid choice for data-intensive applications.

In this post, we will walk through an example of loading the Iris dataset, transforming it to a “long format,” and handling data errors gracefully along the way.

Sepal.Length Sepal.Width Petal.Length Petal.Width Species
5.1 3.5 1.4 0.2 setosa
4.9 3 1.4 0.2 setosa
4.7 3.2. 1.3 0.2 setosa
4.6 3.1 1.5 0.2 setosa
5.0 3.6 x23 0.2 setosa
5.4 3.9 1.7 0.4 setosa

The main.go file will serve as the entry point for our application. Here, we’ll define each function in separate files and organize the pipeline. To standardize the handling of missing values in this pipeline, we’ll define a constant NA. The high-level workflow includes reading the data, pivoting it to long format, and then printing the results along with any errors encountered during processing. Let’s go through each function in detail.

package main

import (
    "fmt"
)

const (
    NA = "9999"
)

func main() {

    data := ReadCsv("iris.csv")
    res, errLog := PivotLong(data)

    for i := 0; i < len(res); i++ {
        fmt.Print(res[i].Value, "\t", res[i].Item, "\t", res[i].Species, "\n")
    }

    for i := 0; i < len(errLog); i++ {
        fmt.Println(
            errLog[i].Time.Format("2006-01-02 15:04:05"),
            errLog[i].Message)
    }
}

Reading in data

The ReadCsv function reads in the data file. Using Go’s built-in csv package, we define a reader with the correct delimiter and allocate a slice of slices for the data. Our function validates the data by ensuring that it’s not empty and checks the format, making sure that the first row (the header) is the longest row in the file. If rows have fewer cells than expected (missing trailing delimiters), we fill these missing cells with the NA constant. Similarly, if a cell is empty but contains only whitespace, we trim it and replace it with NA. Finally, we populate the data slice row by row.

package main

import (
    "encoding/csv"
    "os"
    "strconv"
    "strings"
)

func ReadCsv(path string) [][]string {
    file, err := os.Open(path)
    if err != nil {
        ErrExit("Error opening file")
    }
    defer file.Close()

    reader := csv.NewReader(file)
    reader.Comma = ','

    data := make([][]string, 0)

    var headerLen int
    var rowCounter int
    for {
        rowCounter++
        row, _ := reader.Read()
        if row == nil {
            if rowCounter < 3 {
                ErrExit("Empty csv file")
            }
            break
        }

        if len(row) > headerLen {
            headerLen = len(row)
            if rowCounter > 1 {
                ErrExit("CSV Misformation in row" + strconv.Itoa(rowCounter))
            }
        }

        for i := range row {
            if row[i] = strings.TrimSpace(row[i]); row[i] == "" {
                row[i] = NA
            }
        }

        lenDiff := headerLen - len(row)
        for i := 0; i < lenDiff; i++ {
            row = append(row, NA)
        }

        data = append(data, row)
    }

    return data
}

Custom Error handling

Our data reading function includes checks to ensure the file opens correctly and that the CSV format is as expected. Go treats errors as values, making it easy to create custom error handling functions to manage errors based on context. In this pipeline, we define two error functions: one logs errors and continues processing, while the other logs the error and exits the program. The exit function is useful for critical issues, such as a malformed file, that should prevent further execution. By contrast, the logging-only function is used in non-critical situations (like the pivot function) where we can skip over problematic rows.

Here, we define a custom error structure containing a timestamp and message, which could be useful for logging errors in a database if the pipeline is deployed in a production environment.

package main

import (
    "fmt"
    "os"
    "time"
)

type errors struct {
    Time    time.Time
    Message string
}

func ErrLog(errLog []errors, message string) []errors {
    errLog = append(errLog, errors{Time: time.Now(), Message: message})
    return errLog
}

func ErrExit(message string) {

    defer os.Exit(1)
    var errlog []errors
    errlog = append(errlog, errors{Time: time.Now(), Message: message})

    for i := 0; i < len(errlog); i++ {
        fmt.Println(
            errlog[i].Time.Format("2006-01-02 15:04:05"),
            errlog[i].Message)
    }
}

Pivoting the data

As Go is statically typed, we define a struct to hold the pivoted data. The result struct contains fields for the measurement Value, the Item name, and the Species of our flower. The PivotLong function transforms the data into long format, and any errors encountered during parsing are logged. These error logs, along with the transformed data, are printed in the main function.

package main

import (
    "strconv"
)

type result struct {
    Value   float64
    Item    string
    Species string
}

func PivotLong(data [][]string) ([]result, []errors) {
    var res []result
    var errLog []errors
    var errMess string

    for row := 1; row < len(data); row++ {
        species := data[row][4]
        if species == NA {
            errMess = "Species is NA in row: " + strconv.Itoa(row)
            errLog = ErrLog(errLog, errMess)
            continue
        }

        for col := 0; col < 4; col++ {
            item := data[0][col]
            answerStr := data[row][col]
            if answerStr == NA {
                errMess = "NA Value in " + string(data[0][col]) + " row: " + strconv.Itoa(row)
                errLog = ErrLog(errLog, errMess)
                continue
            }

            value, err := strconv.ParseFloat(answerStr, 64)
            if err != nil {
                errMess = "Error parsing float of " + string(data[0][col]) + " in row: " + strconv.Itoa(row) + " value: " + string(answerStr)
                errLog = ErrLog(errLog, errMess)
                continue
            }

            res = append(res, result{
                Value:   value,
                Item:    item,
                Species: species,
            })
        }
    }
    return res, errLog
}

Creating data pipelines in Go is more hands-on than in some higher-level languages, which gives you greater control over each step. This approach allows you to handle data in a precise manner, especially useful for catching and managing unexpected issues. Despite the lower-level nature of Go, the code remains readable and maintainable, making it suitable even for complex pipelines.

5.1         Sepal.Length            setosa    
3.5         Sepal.Width             setosa    
1.4         Petal.Length            setosa    
0.2         Petal.Width             setosa    
4.9         Sepal.Length            setosa    
3           Sepal.Width             setosa    
1.4         Petal.Length            setosa    
0.2         Petal.Width             setosa    
4.7         Sepal.Length            setosa    
1.3         Petal.Length            setosa    
0.2         Petal.Width             setosa    
4.6         Sepal.Length            setosa    
3.1         Sepal.Width             setosa    
1.5         Petal.Length            setosa    
0.2         Petal.Width             setosa    
5           Sepal.Length            setosa    
3.6         Sepal.Width             setosa    
0.2         Petal.Width             setosa    
5.4         Sepal.Length            setosa    
3.9         Sepal.Width             setosa    
1.7         Petal.Length            setosa    
0.4         Petal.Width             setosa    
5           Sepal.Length            setosa    
3.4         Sepal.Width             setosa    
1.5         Petal.Length            setosa    
4.4         Sepal.Length            setosa    
2.9         Sepal.Width             setosa    
1.4         Petal.Length            setosa    
0.2         Petal.Width             setosa    
2024-11-01 14:19:30 Error parsing float of Sepal.Width in row: 3 value: 3.2.
2024-11-01 14:19:30 Error parsing float of Petal.Length in row: 5 value: x23
2024-11-01 14:19:30 Species is NA in row: 7
2024-11-01 14:19:30 NA Value in Petal.Width row: 8