qrz/vendor/github.com/lib/pq/copy.go

342 lines
7.2 KiB
Go
Raw Normal View History

2020-06-01 14:34:37 +02:00
package pq
import (
2022-07-16 11:43:41 +02:00
"context"
2020-06-01 14:34:37 +02:00
"database/sql/driver"
"encoding/binary"
"errors"
"fmt"
"sync"
)
var (
errCopyInClosed = errors.New("pq: copyin statement has already been closed")
errBinaryCopyNotSupported = errors.New("pq: only text format supported for COPY")
errCopyToNotSupported = errors.New("pq: COPY TO is not supported")
errCopyNotSupportedOutsideTxn = errors.New("pq: COPY is only allowed inside a transaction")
errCopyInProgress = errors.New("pq: COPY in progress")
)
// CopyIn creates a COPY FROM statement which can be prepared with
// Tx.Prepare(). The target table should be visible in search_path.
func CopyIn(table string, columns ...string) string {
stmt := "COPY " + QuoteIdentifier(table) + " ("
for i, col := range columns {
if i != 0 {
stmt += ", "
}
stmt += QuoteIdentifier(col)
}
stmt += ") FROM STDIN"
return stmt
}
// CopyInSchema creates a COPY FROM statement which can be prepared with
// Tx.Prepare().
func CopyInSchema(schema, table string, columns ...string) string {
stmt := "COPY " + QuoteIdentifier(schema) + "." + QuoteIdentifier(table) + " ("
for i, col := range columns {
if i != 0 {
stmt += ", "
}
stmt += QuoteIdentifier(col)
}
stmt += ") FROM STDIN"
return stmt
}
type copyin struct {
cn *conn
buffer []byte
rowData chan []byte
done chan bool
closed bool
2022-03-26 12:13:52 +01:00
mu struct {
sync.Mutex
err error
driver.Result
}
2020-06-01 14:34:37 +02:00
}
const ciBufferSize = 64 * 1024
// flush buffer before the buffer is filled up and needs reallocation
const ciBufferFlushSize = 63 * 1024
func (cn *conn) prepareCopyIn(q string) (_ driver.Stmt, err error) {
if !cn.isInTransaction() {
return nil, errCopyNotSupportedOutsideTxn
}
ci := &copyin{
cn: cn,
buffer: make([]byte, 0, ciBufferSize),
rowData: make(chan []byte),
done: make(chan bool, 1),
}
// add CopyData identifier + 4 bytes for message length
ci.buffer = append(ci.buffer, 'd', 0, 0, 0, 0)
b := cn.writeBuf('Q')
b.string(q)
cn.send(b)
awaitCopyInResponse:
for {
t, r := cn.recv1()
switch t {
case 'G':
if r.byte() != 0 {
err = errBinaryCopyNotSupported
break awaitCopyInResponse
}
go ci.resploop()
return ci, nil
case 'H':
err = errCopyToNotSupported
break awaitCopyInResponse
case 'E':
err = parseError(r)
case 'Z':
if err == nil {
2022-03-26 12:13:52 +01:00
ci.setBad(driver.ErrBadConn)
2020-06-01 14:34:37 +02:00
errorf("unexpected ReadyForQuery in response to COPY")
}
cn.processReadyForQuery(r)
return nil, err
default:
2022-03-26 12:13:52 +01:00
ci.setBad(driver.ErrBadConn)
2020-06-01 14:34:37 +02:00
errorf("unknown response for copy query: %q", t)
}
}
// something went wrong, abort COPY before we return
b = cn.writeBuf('f')
b.string(err.Error())
cn.send(b)
for {
t, r := cn.recv1()
switch t {
case 'c', 'C', 'E':
case 'Z':
// correctly aborted, we're done
cn.processReadyForQuery(r)
return nil, err
default:
2022-03-26 12:13:52 +01:00
ci.setBad(driver.ErrBadConn)
2020-06-01 14:34:37 +02:00
errorf("unknown response for CopyFail: %q", t)
}
}
}
func (ci *copyin) flush(buf []byte) {
// set message length (without message identifier)
binary.BigEndian.PutUint32(buf[1:], uint32(len(buf)-1))
_, err := ci.cn.c.Write(buf)
if err != nil {
panic(err)
}
}
func (ci *copyin) resploop() {
for {
var r readBuf
t, err := ci.cn.recvMessage(&r)
if err != nil {
2022-03-26 12:13:52 +01:00
ci.setBad(driver.ErrBadConn)
2020-06-01 14:34:37 +02:00
ci.setError(err)
ci.done <- true
return
}
switch t {
case 'C':
// complete
2020-12-05 17:36:50 +01:00
res, _ := ci.cn.parseComplete(r.string())
ci.setResult(res)
2020-06-01 14:34:37 +02:00
case 'N':
if n := ci.cn.noticeHandler; n != nil {
n(parseError(&r))
}
case 'Z':
ci.cn.processReadyForQuery(&r)
ci.done <- true
return
case 'E':
err := parseError(&r)
ci.setError(err)
default:
2022-03-26 12:13:52 +01:00
ci.setBad(driver.ErrBadConn)
2020-06-01 14:34:37 +02:00
ci.setError(fmt.Errorf("unknown response during CopyIn: %q", t))
ci.done <- true
return
}
}
}
2022-03-26 12:13:52 +01:00
func (ci *copyin) setBad(err error) {
ci.cn.err.set(err)
2020-06-01 14:34:37 +02:00
}
2022-03-26 12:13:52 +01:00
func (ci *copyin) getBad() error {
return ci.cn.err.get()
2020-06-01 14:34:37 +02:00
}
2022-03-26 12:13:52 +01:00
func (ci *copyin) err() error {
ci.mu.Lock()
err := ci.mu.err
ci.mu.Unlock()
return err
2020-06-01 14:34:37 +02:00
}
// setError() sets ci.err if one has not been set already. Caller must not be
// holding ci.Mutex.
func (ci *copyin) setError(err error) {
2022-03-26 12:13:52 +01:00
ci.mu.Lock()
if ci.mu.err == nil {
ci.mu.err = err
2020-06-01 14:34:37 +02:00
}
2022-03-26 12:13:52 +01:00
ci.mu.Unlock()
2020-06-01 14:34:37 +02:00
}
2020-12-05 17:36:50 +01:00
func (ci *copyin) setResult(result driver.Result) {
2022-03-26 12:13:52 +01:00
ci.mu.Lock()
ci.mu.Result = result
ci.mu.Unlock()
2020-12-05 17:36:50 +01:00
}
func (ci *copyin) getResult() driver.Result {
2022-03-26 12:13:52 +01:00
ci.mu.Lock()
result := ci.mu.Result
ci.mu.Unlock()
2020-12-05 17:36:50 +01:00
if result == nil {
return driver.RowsAffected(0)
}
return result
}
2020-06-01 14:34:37 +02:00
func (ci *copyin) NumInput() int {
return -1
}
func (ci *copyin) Query(v []driver.Value) (r driver.Rows, err error) {
return nil, ErrNotSupported
}
// Exec inserts values into the COPY stream. The insert is asynchronous
// and Exec can return errors from previous Exec calls to the same
// COPY stmt.
//
// You need to call Exec(nil) to sync the COPY stream and to get any
// errors from pending data, since Stmt.Close() doesn't return errors
// to the user.
func (ci *copyin) Exec(v []driver.Value) (r driver.Result, err error) {
if ci.closed {
return nil, errCopyInClosed
}
2022-03-26 12:13:52 +01:00
if err := ci.getBad(); err != nil {
return nil, err
2020-06-01 14:34:37 +02:00
}
defer ci.cn.errRecover(&err)
2022-03-26 12:13:52 +01:00
if err := ci.err(); err != nil {
return nil, err
2020-06-01 14:34:37 +02:00
}
if len(v) == 0 {
2020-12-05 17:36:50 +01:00
if err := ci.Close(); err != nil {
return driver.RowsAffected(0), err
}
return ci.getResult(), nil
2020-06-01 14:34:37 +02:00
}
numValues := len(v)
for i, value := range v {
ci.buffer = appendEncodedText(&ci.cn.parameterStatus, ci.buffer, value)
if i < numValues-1 {
ci.buffer = append(ci.buffer, '\t')
}
}
ci.buffer = append(ci.buffer, '\n')
if len(ci.buffer) > ciBufferFlushSize {
ci.flush(ci.buffer)
// reset buffer, keep bytes for message identifier and length
ci.buffer = ci.buffer[:5]
}
return driver.RowsAffected(0), nil
}
2022-07-16 11:43:41 +02:00
// CopyData inserts a raw string into the COPY stream. The insert is
// asynchronous and CopyData can return errors from previous CopyData calls to
// the same COPY stmt.
//
// You need to call Exec(nil) to sync the COPY stream and to get any
// errors from pending data, since Stmt.Close() doesn't return errors
// to the user.
func (ci *copyin) CopyData(ctx context.Context, line string) (r driver.Result, err error) {
if ci.closed {
return nil, errCopyInClosed
}
if finish := ci.cn.watchCancel(ctx); finish != nil {
defer finish()
}
if err := ci.getBad(); err != nil {
return nil, err
}
defer ci.cn.errRecover(&err)
if err := ci.err(); err != nil {
return nil, err
}
ci.buffer = append(ci.buffer, []byte(line)...)
ci.buffer = append(ci.buffer, '\n')
if len(ci.buffer) > ciBufferFlushSize {
ci.flush(ci.buffer)
// reset buffer, keep bytes for message identifier and length
ci.buffer = ci.buffer[:5]
}
return driver.RowsAffected(0), nil
}
2020-06-01 14:34:37 +02:00
func (ci *copyin) Close() (err error) {
if ci.closed { // Don't do anything, we're already closed
return nil
}
ci.closed = true
2022-03-26 12:13:52 +01:00
if err := ci.getBad(); err != nil {
return err
2020-06-01 14:34:37 +02:00
}
defer ci.cn.errRecover(&err)
if len(ci.buffer) > 0 {
ci.flush(ci.buffer)
}
// Avoid touching the scratch buffer as resploop could be using it.
err = ci.cn.sendSimpleMessage('c')
if err != nil {
return err
}
<-ci.done
ci.cn.inCopy = false
2022-03-26 12:13:52 +01:00
if err := ci.err(); err != nil {
2020-06-01 14:34:37 +02:00
return err
}
return nil
}