Introduction

Me and my former research adviser, Dr. Zhaochen He at Christopher Newport University, coauthored a paper that studies the impact of several anti-immigration laws issued in early 2010s on employment and income for Americans working legally in these states draft available here. This research project uses the Quarterly Workforce Indicators (QWI) data, and following code chunks do the preparation part of the empirical analysis:

library(tidyverse)
library(data.table)
library(RJDemetra)
library(htmltab)
library(tidycensus)
library(lubridate)
library(rlist)
library(doParallel)
library(foreach)
library(parallel)

setwd('E:/Github/proj1')

Step 1: Select the Observations & Variables of Interest

Here is a preview of the raw QWI data:

qwi = fread('qwi_naics2_eth_edu.csv')
# to save time and computing power, only select of subset of our data to demonstrate
qwi = qwi[industry == '00' & race == 'A0' & ethnicity == 'A0' & education == 'E0']
head(qwi,10)
##     geography industry sex agegrp race ethnicity education year quarter    Emp
##  1:      2013       00   0    A00   A0        A0        E0 2009       1    844
##  2:      2016       00   0    A00   A0        A0        E0 2009       1   3285
##  3:      2020       00   0    A00   A0        A0        E0 2009       1 143633
##  4:      2050       00   0    A00   A0        A0        E0 2009       1   5350
##  5:      2060       00   0    A00   A0        A0        E0 2009       1    522
##  6:      2068       00   0    A00   A0        A0        E0 2009       1    591
##  7:      2070       00   0    A00   A0        A0        E0 2009       1   1845
##  8:      2090       00   0    A00   A0        A0        E0 2009       1  30261
##  9:      2100       00   0    A00   A0        A0        E0 2009       1    638
## 10:      2105       00   0    A00   A0        A0        E0 2009       1    384
##       EmpS EmpTotal  HirA HirAS SepS FrmJbC FrmJbCS EarnS    Payroll
##  1:    754     1083   239    63   56     89       8  2705    7344977
##  2:   3092     4841  1555   159  126   1028      35  2796   32880949
##  3: 128225   166637 23004 13432 9817   1220    3592  4158 1758547289
##  4:   4628     6587  1237   632  439    -37     203  2493   40342661
##  5:    479      619    97    50   29     29      21  3434    5160227
##  6:    559      679    87    41   21     31      19  5132    8660429
##  7:   1645     2290   446   204  130     86      81  3034   16581626
##  8:  26831    35786  5525  2827 2240    549     581  3440  305479243
##  9:    581      765   127    78   37     32      41  2615    4954830
## 10:    349      474    90    39   22     14      17  1999    2301836

We are only interested in certain observations and variables, the chunk below does the selection:

# create a date variable called yq
qwi = qwi[, yq := paste(year,'-',quarter)][, dt := yq(yq)]

# extract the variables of interest
qwi = qwi[,statefipsn := floor(geography/1000)][,cntyfipsn := geography - statefipsn*1000]
qwi = qwi[, c("dt","geography","statefipsn","cntyfipsn","industry",
              "race","ethnicity","education","Emp","HirA","HirAS", 
              "FrmJbC","EarnS"), with = F]
names(qwi)[2] = 'geo'

# select of states/counties of interest
statelist = c(18,26,17,21,39,4,32,8,35,6,
              49,32,16,56,8,1,12,28,47,13,
              37,12,47,45,37)
qwi = qwi[statefipsn %in% statelist]
qwi = unique(qwi)

# convert Some Variabes to Numeric
varnames = c('Emp', 'HirA', 'HirAS', 'FrmJbC', 'EarnS')
qwi = qwi[, (varnames) := lapply(.SD, as.numeric), .SDcols = varnames]

# reshape the table to long form
id.vars = c('dt', 'geo','industry', 'race', 'ethnicity', 'education', 'statefipsn', 'cntyfipsn')
qwi = melt(qwi, id.vars = id.vars)
head(qwi,10)
##             dt  geo industry race ethnicity education statefipsn cntyfipsn
##  1: 2009-01-01 1001       00   A0        A0        E0          1         1
##  2: 2009-01-01 1003       00   A0        A0        E0          1         3
##  3: 2009-01-01 1005       00   A0        A0        E0          1         5
##  4: 2009-01-01 1007       00   A0        A0        E0          1         7
##  5: 2009-01-01 1009       00   A0        A0        E0          1         9
##  6: 2009-01-01 1011       00   A0        A0        E0          1        11
##  7: 2009-01-01 1013       00   A0        A0        E0          1        13
##  8: 2009-01-01 1015       00   A0        A0        E0          1        15
##  9: 2009-01-01 1017       00   A0        A0        E0          1        17
## 10: 2009-01-01 1019       00   A0        A0        E0          1        19
##     variable value
##  1:      Emp 11174
##  2:      Emp 56894
##  3:      Emp  9371
##  4:      Emp  3604
##  5:      Emp  8500
##  6:      Emp  2960
##  7:      Emp  6203
##  8:      Emp 42561
##  9:      Emp  7083
## 10:      Emp  4872

Step 2: Ajusted Seasonality

The time series (e.g., Emp for total employment of a county in a certain quarter) have seasonal fluctuations, here we use RJDemetra package to smooth the seasonal swing:

# this is the seasonality adjustment function:
adjust = function(x) {
  z = ts(x, start = 2009, frequency = 4)
  z = tramoseats(z, spec = "RSA5")
  z = z$final$series[,2]
  return(as.numeric(z))
}

Here we construct a table called dir_all in which each observation specifies a certain time series:

# INPUT:
# breakdown by race & eth
naics = unique(qwi$industry)
g = unique(qwi$geo)
edu = unique(qwi$education)
r = unique(qwi$race)
eth = unique(qwi$ethnicity)
var = unique(qwi$variable)

dir_1 = as.data.table(expand.grid(geo = g, race = r, ethnicity = eth, industry = naics,
                                  education = 'E0', variable = var))
# breakdown by edu, but less EO, which is included in dir_1
dir_2 = as.data.table(expand.grid(geo = g, race = 'A0', ethnicity = 'A0', industry = naics,
                                  education = edu[2:6], variable = var))
dir_all = rbind(dir_1, dir_2)
dir_all = dir_all[1:1000] # again, this is to save time since it's just for demonstration
dir_all$tsid = 1:nrow(dir_all)
head(dir_all,10)
##      geo race ethnicity industry education variable tsid
##  1: 1001   A0        A0       00        E0      Emp    1
##  2: 1003   A0        A0       00        E0      Emp    2
##  3: 1005   A0        A0       00        E0      Emp    3
##  4: 1007   A0        A0       00        E0      Emp    4
##  5: 1009   A0        A0       00        E0      Emp    5
##  6: 1011   A0        A0       00        E0      Emp    6
##  7: 1013   A0        A0       00        E0      Emp    7
##  8: 1015   A0        A0       00        E0      Emp    8
##  9: 1017   A0        A0       00        E0      Emp    9
## 10: 1019   A0        A0       00        E0      Emp   10

Here is the parallel loop that does the seasonality adjustment:

# divide the list into smaller lists
nchunks = 10
chunksize = ceiling(nrow(dir_all)/nchunks)
dir_all[, chunk := rep(1:nchunks, each = chunksize)[1:nrow(dir_all)]]

# specify the computing power we want to use
setkey(qwi, geo, race, ethnicity, industry, education, variable)
ncore = 6
setDTthreads(2)
dirpost = NULL


# ADJUSTMENT LOOP:
for (j in 1:nchunks) {
  
  dir = dir_all[chunk == j]
  start_time = Sys.time()
  
  # preload data into a giant list
  l.raw = list()
  l.date = list()
  
  for (i in 1:nrow(dir)) {
    if (i %% 10000 == 0) { print(i) }
    # uses key to extract time series
    z = qwi[list(dir[i, geo], dir[i, race], dir[i, ethnicity], 
                 dir[i, industry], dir[i, education], dir[i, variable]), .(value,dt)]
    l.raw[[i]] = z$value
    l.date[[i]] = z$dt
  }
  
  # run adjustment in parallel
  start_time = Sys.time()
  try(file.remove(paste0('adj/logs/chunk_', j, '.txt')))
  cl = makePSOCKcluster(ncore, outfile = paste0('adj/logs/chunk_', j, '.txt'))
  registerDoParallel(cl)
  adj_out = 
    tryCatch({
      foreach(series = l.raw, date = l.date, i = icount(),
              .packages = c('foreach', 'data.table', 'RJDemetra'),
              .errorhandling = 'pass'
      ) %dopar% {
        info = dir[i]
        series_adj = adjust(series)
        identical = identical(series, series_adj)
        samelength = length(series) == length(series_adj)
        if (samelength) {
          outdt = data.table(dt = date,
                             geo = info[, geo],
                             race = info[, race],
                             ethnicity = info[, ethnicity],
                             industry = info[, industry],
                             education = info[, education],
                             variable = info[, variable],
                             tsid = info[, tsid],
                             value = series,
                             value_adj = series_adj)
        }
        cat(i, '\n') # record iteration number
        res = list(chunk = j, row = i, identical = identical, samelength = samelength, outdt=outdt)
        return(res)
      }
    }, error = function(e) {return(e)})
  stopCluster(cl)
  
  # error reporting
  # report parallel loop error
  dir[, error1 := 'good']
  if ('error' %in% class(adj_out)) {
    dir[, error1 := paste0(as.character(adj_out), collapse = '_')]
  }
  
  dir[, error2 := 'good']
  for (i in 1:length(adj_out)) {
    if ('error' %in% class(adj_out[[i]])) {
      dir[i, 'error2'] = paste0(as.character(adj_out[[i]]), collapse = '_')
    }
  }
  
  saveRDS(adj_out, file = paste0('adj/adj_out_chunk_', j, '.rds'))
  fwrite(dir, file = paste0('adj/dir_chunk_', j, '.csv'))
  
}

qwi_adj = NULL
for (i in 1:(nchunks)) {
  #i = 1
  chunk = readRDS(paste0('adj/adj_out_chunk_', i, '.rds'))
  z1 = unlist(lapply(chunk, length)) == 5 # Items with length !=5 are erroneous
  chunk_clean = chunk[z1]
  z2 = unlist(lapply(chunk_clean, '[[', 'samelength')) # Items w/ samelength = F are erroneous #***
  chunk_clean = chunk_clean[z2]
  qwi_adj_chunk = lapply(chunk_clean, '[[', 'outdt') 
  qwi_adj_chunk = rbindlist(qwi_adj_chunk)
  qwi_adj = rbind(qwi_adj, qwi_adj_chunk)
}
head(qwi_adj, 10)
##             dt  geo race ethnicity industry education variable tsid value
##  1: 2009-01-01 1001   A0        A0       00        E0      Emp    1 11174
##  2: 2009-04-01 1001   A0        A0       00        E0      Emp    1 11307
##  3: 2009-07-01 1001   A0        A0       00        E0      Emp    1 11101
##  4: 2009-10-01 1001   A0        A0       00        E0      Emp    1 11064
##  5: 2010-01-01 1001   A0        A0       00        E0      Emp    1 10908
##  6: 2010-04-01 1001   A0        A0       00        E0      Emp    1 11178
##  7: 2010-07-01 1001   A0        A0       00        E0      Emp    1 10991
##  8: 2010-10-01 1001   A0        A0       00        E0      Emp    1 11096
##  9: 2011-01-01 1001   A0        A0       00        E0      Emp    1 10824
## 10: 2011-04-01 1001   A0        A0       00        E0      Emp    1 11279
##     value_adj
##  1:  11271.93
##  2:  11157.58
##  3:  11172.11
##  4:  11049.40
##  5:  11019.77
##  6:  11028.59
##  7:  11048.17
##  8:  11033.57
##  9:  10980.97
## 10:  11127.88

Here is an example of raw time series vs. adjusted time series:

dat = qwi_adj[geo == 1001 & race == 'A0' & 
            ethnicity == 'A0' & education == 'E0' & 
            industry == '00',c('dt','value','value_adj')]
dat$dt = as.Date(dat$dt)
ggplot(dat, aes(x = dt)) +
  geom_line(aes(y = value), color ="darkred", size = 0.7) +
  geom_line(aes(y = value_adj), color="steelblue", size=1) +
  labs(title="Raw Data vs. Seasonality Adjusted Data", 
       y="Total Employment", 
       x="Year") +
  geom_text(aes(x = as.Date("2014-05-07"), y = 11430, label = "Raw Series"), color = 'darkred') +
  geom_text(aes(x = as.Date("2015-12-14"), y = 11200, label = "Adjusted Series"), color = 'steelblue')

Step 3: Reshape The Data

Right now all the raw time series (i.e., Emp for total employment, EarnS for earnings, etc.) are stacked together, and all the adjusted time series are stacked together. The chunk below unstack the data for us, making Emp, Emp_adjusted, EarnS, EarnS_adjusted, etc. each occupies a column:

qwiLong = melt(qwi_adj, id.vars = c('dt','geo','race','ethnicity','industry','education','variable'), 
               variable.name = 'adj', measure.vars = c('value', 'value_adj', 'tsid'))
qwiLong[adj == 'value', adj := '']
qwiLong[adj == 'value_adj', adj := '_adjusted']
qwiLong[adj == 'tsid', adj := '_tsid']
qwiLong = qwiLong[, vars := paste0(variable, adj)]
qwiLong = qwiLong[, c("dt","geo","race","ethnicity","industry","education","vars","value"), with = F]
qwiWide = dcast(qwiLong, dt + geo + race + ethnicity + industry + education ~ vars, value.var = 'value')
qwiWide = qwiWide[, c("year","month","date") := tstrsplit(dt, "-")]
qwiWide$year = as.numeric(qwiWide$year)
qwiWide = subset(qwiWide, select = -c(month,date))
head(qwiWide, 10)
##             dt  geo race ethnicity industry education   Emp Emp_adjusted
##  1: 2009-01-01 1001   A0        A0       00        E0 11174    11271.925
##  2: 2009-01-01 1003   A0        A0       00        E0 56894    59145.730
##  3: 2009-01-01 1005   A0        A0       00        E0  9371     9371.000
##  4: 2009-01-01 1007   A0        A0       00        E0  3604     3598.512
##  5: 2009-01-01 1009   A0        A0       00        E0  8500     8542.074
##  6: 2009-01-01 1011   A0        A0       00        E0  2960     3037.754
##  7: 2009-01-01 1013   A0        A0       00        E0  6203     6280.887
##  8: 2009-01-01 1015   A0        A0       00        E0 42561    42954.325
##  9: 2009-01-01 1017   A0        A0       00        E0  7083     7145.445
## 10: 2009-01-01 1019   A0        A0       00        E0  4872     4939.804
##     Emp_tsid year
##  1:        1 2009
##  2:        2 2009
##  3:        3 2009
##  4:        4 2009
##  5:        5 2009
##  6:        6 2009
##  7:        7 2009
##  8:        8 2009
##  9:        9 2009
## 10:       10 2009

Note: in the sample output above, we only see Emp and Emp_adjusted but not the other time series variables. This is because we only run the code on a small subset of QWI since this is just a demonstration. Should we pass the entire QWI to the chunks above, we will obtain many more co-variates in the output.