SQL Server, A Million Updates, Multithreading and Queues

In the past month or two, with work, I've had two projects that have involved massive updates of data. Pulling data from a source, processing it, and updating SQL Server in both instances coincidentally. I've learned a lot.

First, SQL Server does not respond well to multiple threads doing thousands of updates each. I did not know this. I've seen the error message, "Transaction (Process ID XX) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction." more times than I'd like to admit. I've done multiple threads doing SQL updates many times before, but I guess never with tens of thousands of rows.

I wrote two apps that process hundreds of thousands of rows of data each. One was written in C#, the other was in Python. I'm not quite as adept in Python but I've learned some tricks.

The approach I've taken for each language was almost similar. Both involved creating a shared queue that would hold all of the SQL statements that need to run. The SQL statements are just stored procedure calls. There would be one process that just goes through the queue, batches them into 15-30 statement chunks, then executes it.

The two solutions, Python and C#, were slightly different though. In Python, the multiple threads would add to the queue, then after all the threads were done processing, it would process all of them. The C# solution involved creating an object which was a singleton (per connection string), and held the queue, and it would contain its own thread which would constantly process the queue. But just one thread so it wasn't overwhelming SQL Server in any way. Here's a little bit of code. In each language, I used the built in Queue provided by their respective standard library, although in C# I used the ConcurrentQueue.

C# pseudo code



        multiple threads collecting data
        {
              call service
              add all data to be updated to the service
        }

        service data collection
        {
              get sql for object (orm)
              add to the shared queue
        }


        sql execution thread - run
		{
			while (true)
			{
				open connection
					while queue has stuff
					{
						create a string builder, add 15 or so commands to the batch, separated by ;

						build n database command and execute it.
					}
				}

				close connection.
				sleep for a second.
			}
		}

Python pseudo code


        multiple threads collecting data
        {
             download data (in this case, it's just download csvs.
             process data
             get sql statement
             add sql statement to the shared queue
        }

        main thread
        {
             collect all data (fill the queue) across multiple threads
             process the queue, calling each batch of 35 in this case, in a single thread
        }

So as you can see, the C# version is processing the queue as the data is being collected, and Python waits until the end and then processes the queue. I think the C# approach is better, as I said I'm a little bit more adept with C# so I'm more comfortable doing things like that there. Hopefully this helps someone out there with processing loads of data.

Of course, in Go I would have just used a channel! Just kidding, there'd be the same amount of complexity in Go, but definitely the end result would be a lot better looking!

Happy Coding!

Go Concurrency ala Rob Pike

I watched Rob Pike's talk, "Concurrency is not Parallelism", so I wanted to take what he was saying with his gopher example, and make a program that tightly followed his model.

My example program, in re-writing this website in Go, is the bit of it that gets photos from my Flickr account that I have tagged with "jtccom" in order to make them into header images. This utilizes the Flickr JSON API which is pretty easy to use.

There are multiple steps to using the Flickr API, three separate web calls, which makes this ideal for Go concurrency style programming. First step is to get the photos from my account tagged with "jtccom". This returns an array with photo ID and photo Secret. In order to get the URL for the photo, you have to get the sizes first. This is a separate call to the Flickr API, which returns an array of, among other things, Label and Source. Source is the URL, Label is the size name. In this case I'm only interested in the Original size, which has the "Original" label. The next part is to download the content pointed to by Source in the "Original" Size.

So the idea was to have a goroutine that gets photos (step 1), another one that gets sizes (step 2), and the other one that downloads content. Conceptually, this looks like this:

getPhotos(photos chan Photo)  // pumps photos into the photos channel
getSizes(photos chan Photo, sizes chan []Size) // pumps sizes for each photo into the sizes channel after calling the API for the photos in the photos channel
downloadPhoto(label string, size chan []Size, photoContent chan PhotoContent) // download files of size 'label' from the Size channel and pump it into the PhotoContent channel

Realistically, it works pretty much in order because the calls to getPhotos and getSizes are done way before it's done downloading the content, as each file is around 9-12 MB, but at least the getPhotos and getSizes can pretty much run in parallel.

Code-wise, it looks very similar, just with go routines and some object style things, json parsing etc.

For clarity I broke out Flickr specific calls into a separate file, but not a separate package. Here's the "flickrsvc.go" file, with some hidden things like API key obfuscated.

package main

import (
    "fmt"
    "time"
    "sync"
)

func saveFiles(tmp, dest string, photoContent chan PhotoContent){
    for photoContent := range photoContent {
        fmt.Println("Downloaded", photoContent.Photo.Id, "of size", len(photoContent.Content))
    }
}

func process(){
    var apiKey = "blahblah"
    var userId = "28240873@N07"
    var tag = "jtccom"

    var tmp = "../jtccom/content/tmp_download/"
    var destination = "../jtccom/static/images/backgrounds/"

    procWG := sync.WaitGroup{}

    photos := make(chan Photo)
    sizes := make(chan PhotoWithSizes)
    content := make(chan PhotoContent)

    procWG.Add(3)
    go func(){
        getPhotosByTag(tag, apiKey, userId, photos)
        close(photos)
        procWG.Done()
    }()

    go func(){
        getPhotoSizes(apiKey, photos, sizes)
        close(sizes)
        procWG.Done()
    }()

    go func(){
        downloadPhotos("Original", sizes, content)
        close(content)
        procWG.Done()
    }()

    saveFiles(tmp, destination, content)

    fmt.Println("wait procWG")
    procWG.Wait()
}

func main(){

    for {
        fmt.Println("going")
        process()

        fmt.Println("wait wg")

        fmt.Println("Sleeping")
        time.Sleep(3*time.Second)
    }
}

And here is the output:

C:\Users\jconnell\Documents\go\src\jtccom.flickrsvc>jtccom.flickrsvc.exe
going
Downloaded 14685510038 of size 9867146
Downloaded 14465862480 of size 11279714
Downloaded 14649298391 of size 9423168
Downloaded 14076004795 of size 8925512
Downloaded 13936652032 of size 14851399
Downloaded 12076007194 of size 14099167
Downloaded 11678436824 of size 9671802
Downloaded 11507180674 of size 13510941
Downloaded 11507190024 of size 11963353
Downloaded 11412952753 of size 13030709

Here is flickr.go (although it doesn't matter what it's called).

package main

import (
    "strings"
    "net/http"
    "net/url"
    "encoding/json"
    "io/ioutil"
)

type Response struct {
    Wrap Photos `json:"photos"`
}

type Photos struct {
    Photo []Photo     `json:"photo"`
}

type Photo struct {
    Id string     `json:"id"`
    Secret string `json:"secret"`
}

type SizeArray []Size

func (sizeArray SizeArray) GetSize(label string) Size {
    var size Size
    for _,sz := range sizeArray {
        if strings.EqualFold(sz.Label, label) {
            size = sz
            break
        }
    }
    return size
}

type SizesResponse struct {
    Wrap Sizes `json:"sizes"`
}

type Sizes struct {
    Sizes SizeArray `json:"size"`
}

type Size struct {
    Label string `json:"label"`
    Source string `json:"source"`
}

type PhotoWithSizes struct {
    Photo *Photo
    Sizes SizeArray
}

type PhotoContent struct {
    Photo *Photo
    Content []byte
}

func getPhotosByTag(tag, apiKey, userId string, pchan chan Photo)  {
    qs := url.Values{}
    qs.Add("method", "flickr.photos.search")
    qs.Add("api_key", apiKey)
    qs.Add("user_id", userId)
    qs.Add("tags", tag)
    qs.Add("format", "json")
    qs.Add("nojsoncallback", "1")

    flickrUrl, _ := url.Parse("https://api.flickr.com/services/rest/?" + qs.Encode())

    if resp,err := http.Get(flickrUrl.String()); err == nil {
        defer resp.Body.Close()
        decoder := json.NewDecoder(resp.Body)

        photos := Response{}
        decoder.Decode(&photos)

        for _, p := range photos.Wrap.Photo {
            pchan <- p
        }
    } else {
        panic(err)
    }
}

func downloadPhotos(sizeLabel string, download chan PhotoWithSizes, downloaded chan PhotoContent)  {
    for p := range download {
        url := p.Sizes.GetSize(sizeLabel).Source
        if resp,err := http.Get(url); err == nil {
            bytes,err := ioutil.ReadAll(resp.Body)
            resp.Body.Close()

            if err != nil {
                panic(err)
            } else {
                pc := PhotoContent{ Photo: p.Photo, Content: bytes }
                downloaded <- pc
            }
        } else {
            panic(err)
        }
    }
}

func getPhotoSizes(apiKey string, photos chan Photo, photoSizes chan PhotoWithSizes) {
    for p := range photos {
        qs := url.Values{}
        qs.Add("method", "flickr.photos.getSizes")
        qs.Add("api_key", apiKey)
        qs.Add("photo_id", p.Id)
        qs.Add("format", "json")
        qs.Add("nojsoncallback", "1")

        if sizesUrl, err := url.Parse("https://api.flickr.com/services/rest/?" + qs.Encode()); err == nil {
            if resp,err := http.Get(sizesUrl.String()); err == nil {
                decoder := json.NewDecoder(resp.Body)
                sizeResp := SizesResponse{}
                decoder.Decode(&sizeResp)
                resp.Body.Close()

                photoWithSizes := PhotoWithSizes{ Photo: &p, Sizes: sizeResp.Wrap.Sizes }
                photoSizes <- photoWithSizes
            } else {
                panic(err)
            }
        }
    }
}

I had some problems where the Flickr methods would return channels and they weren't working. And I had to experiment with buffered vs unbuffered channels, internal sync.WaitGroups, and stuff that wasn't working out so well. I will play around with this more, since apparently you can use WaitGroup without using Channels. I definitely want to play more to get a better understanding and find out why stuff I was trying initially wasn't working. But it's working now, I just have to finish it by saving it to the destination folder, and checking if the image was already downloaded. For future me, this would be good to do with a func that takes a channel and outputs to another channel all of the files that haven't yet been downloaded, to keep with the passing channels paradigm I've used so far.