forked from Shopify/ghostferry
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrow_batch.go
88 lines (70 loc) · 1.95 KB
/
row_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package ghostferry
import (
"encoding/json"
"strings"
)
type RowBatch struct {
values []RowData
paginationKeyIndex int
table *TableSchema
fingerprints map[uint64][]byte
columns []string
}
func NewRowBatch(table *TableSchema, values []RowData, paginationKeyIndex int) *RowBatch {
return &RowBatch{
values: values,
paginationKeyIndex: paginationKeyIndex,
table: table,
columns: ConvertTableColumnsToStrings(table.Columns),
}
}
func (e *RowBatch) Values() []RowData {
return e.values
}
func (e *RowBatch) EstimateByteSize() uint64 {
var total int
for _, v := range e.values {
size, err := json.Marshal(v)
if err != nil {
continue
}
total += len(size)
}
return uint64(total)
}
func (e *RowBatch) PaginationKeyIndex() int {
return e.paginationKeyIndex
}
func (e *RowBatch) ValuesContainPaginationKey() bool {
return e.paginationKeyIndex >= 0
}
func (e *RowBatch) Size() int {
return len(e.values)
}
func (e *RowBatch) TableSchema() *TableSchema {
return e.table
}
func (e *RowBatch) Fingerprints() map[uint64][]byte {
return e.fingerprints
}
func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface{}, error) {
if err := verifyValuesHasTheSameLengthAsColumns(e.table, e.values...); err != nil {
return "", nil, err
}
valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)"
valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr
query := "INSERT IGNORE INTO " +
QuotedTableNameFromString(schemaName, tableName) +
" (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr
return query, e.flattenRowData(), nil
}
func (e *RowBatch) flattenRowData() []interface{} {
rowSize := len(e.values[0])
flattened := make([]interface{}, rowSize*len(e.values))
for rowIdx, row := range e.values {
for colIdx, col := range row {
flattened[rowIdx*rowSize+colIdx] = col
}
}
return flattened
}