1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package sql
17
18 import (
19 "context"
20 "database/sql/driver"
21 "errors"
22 "fmt"
23 "io"
24 "math/rand/v2"
25 "reflect"
26 "runtime"
27 "slices"
28 "strconv"
29 "sync"
30 "sync/atomic"
31 "time"
32 _ "unsafe"
33 )
34
35 var driversMu sync.RWMutex
36
37
38
39
40
41
42
43
44
45
46
47 var drivers = make(map[string]driver.Driver)
48
49
50 var nowFunc = time.Now
51
52
53
54
55 func Register(name string, driver driver.Driver) {
56 driversMu.Lock()
57 defer driversMu.Unlock()
58 if driver == nil {
59 panic("sql: Register driver is nil")
60 }
61 if _, dup := drivers[name]; dup {
62 panic("sql: Register called twice for driver " + name)
63 }
64 drivers[name] = driver
65 }
66
67 func unregisterAllDrivers() {
68 driversMu.Lock()
69 defer driversMu.Unlock()
70
71 drivers = make(map[string]driver.Driver)
72 }
73
74
75 func Drivers() []string {
76 driversMu.RLock()
77 defer driversMu.RUnlock()
78 list := make([]string, 0, len(drivers))
79 for name := range drivers {
80 list = append(list, name)
81 }
82 slices.Sort(list)
83 return list
84 }
85
86
87
88
89
90
91
92 type NamedArg struct {
93 _NamedFieldsRequired struct{}
94
95
96
97
98
99
100
101 Name string
102
103
104
105
106 Value any
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121 func Named(name string, value any) NamedArg {
122
123
124
125
126 return NamedArg{Name: name, Value: value}
127 }
128
129
130 type IsolationLevel int
131
132
133
134
135
136 const (
137 LevelDefault IsolationLevel = iota
138 LevelReadUncommitted
139 LevelReadCommitted
140 LevelWriteCommitted
141 LevelRepeatableRead
142 LevelSnapshot
143 LevelSerializable
144 LevelLinearizable
145 )
146
147
148 func (i IsolationLevel) String() string {
149 switch i {
150 case LevelDefault:
151 return "Default"
152 case LevelReadUncommitted:
153 return "Read Uncommitted"
154 case LevelReadCommitted:
155 return "Read Committed"
156 case LevelWriteCommitted:
157 return "Write Committed"
158 case LevelRepeatableRead:
159 return "Repeatable Read"
160 case LevelSnapshot:
161 return "Snapshot"
162 case LevelSerializable:
163 return "Serializable"
164 case LevelLinearizable:
165 return "Linearizable"
166 default:
167 return "IsolationLevel(" + strconv.Itoa(int(i)) + ")"
168 }
169 }
170
171 var _ fmt.Stringer = LevelDefault
172
173
174 type TxOptions struct {
175
176
177 Isolation IsolationLevel
178 ReadOnly bool
179 }
180
181
182
183
184 type RawBytes []byte
185
186
187
188
189
190
191
192
193
194
195
196
197
198 type NullString struct {
199 String string
200 Valid bool
201 }
202
203
204 func (ns *NullString) Scan(value any) error {
205 if value == nil {
206 ns.String, ns.Valid = "", false
207 return nil
208 }
209 ns.Valid = true
210 return convertAssign(&ns.String, value)
211 }
212
213
214 func (ns NullString) Value() (driver.Value, error) {
215 if !ns.Valid {
216 return nil, nil
217 }
218 return ns.String, nil
219 }
220
221
222
223
224 type NullInt64 struct {
225 Int64 int64
226 Valid bool
227 }
228
229
230 func (n *NullInt64) Scan(value any) error {
231 if value == nil {
232 n.Int64, n.Valid = 0, false
233 return nil
234 }
235 n.Valid = true
236 return convertAssign(&n.Int64, value)
237 }
238
239
240 func (n NullInt64) Value() (driver.Value, error) {
241 if !n.Valid {
242 return nil, nil
243 }
244 return n.Int64, nil
245 }
246
247
248
249
250 type NullInt32 struct {
251 Int32 int32
252 Valid bool
253 }
254
255
256 func (n *NullInt32) Scan(value any) error {
257 if value == nil {
258 n.Int32, n.Valid = 0, false
259 return nil
260 }
261 n.Valid = true
262 return convertAssign(&n.Int32, value)
263 }
264
265
266 func (n NullInt32) Value() (driver.Value, error) {
267 if !n.Valid {
268 return nil, nil
269 }
270 return int64(n.Int32), nil
271 }
272
273
274
275
276 type NullInt16 struct {
277 Int16 int16
278 Valid bool
279 }
280
281
282 func (n *NullInt16) Scan(value any) error {
283 if value == nil {
284 n.Int16, n.Valid = 0, false
285 return nil
286 }
287 err := convertAssign(&n.Int16, value)
288 n.Valid = err == nil
289 return err
290 }
291
292
293 func (n NullInt16) Value() (driver.Value, error) {
294 if !n.Valid {
295 return nil, nil
296 }
297 return int64(n.Int16), nil
298 }
299
300
301
302
303 type NullByte struct {
304 Byte byte
305 Valid bool
306 }
307
308
309 func (n *NullByte) Scan(value any) error {
310 if value == nil {
311 n.Byte, n.Valid = 0, false
312 return nil
313 }
314 err := convertAssign(&n.Byte, value)
315 n.Valid = err == nil
316 return err
317 }
318
319
320 func (n NullByte) Value() (driver.Value, error) {
321 if !n.Valid {
322 return nil, nil
323 }
324 return int64(n.Byte), nil
325 }
326
327
328
329
330 type NullFloat64 struct {
331 Float64 float64
332 Valid bool
333 }
334
335
336 func (n *NullFloat64) Scan(value any) error {
337 if value == nil {
338 n.Float64, n.Valid = 0, false
339 return nil
340 }
341 n.Valid = true
342 return convertAssign(&n.Float64, value)
343 }
344
345
346 func (n NullFloat64) Value() (driver.Value, error) {
347 if !n.Valid {
348 return nil, nil
349 }
350 return n.Float64, nil
351 }
352
353
354
355
356 type NullBool struct {
357 Bool bool
358 Valid bool
359 }
360
361
362 func (n *NullBool) Scan(value any) error {
363 if value == nil {
364 n.Bool, n.Valid = false, false
365 return nil
366 }
367 n.Valid = true
368 return convertAssign(&n.Bool, value)
369 }
370
371
372 func (n NullBool) Value() (driver.Value, error) {
373 if !n.Valid {
374 return nil, nil
375 }
376 return n.Bool, nil
377 }
378
379
380
381
382 type NullTime struct {
383 Time time.Time
384 Valid bool
385 }
386
387
388 func (n *NullTime) Scan(value any) error {
389 if value == nil {
390 n.Time, n.Valid = time.Time{}, false
391 return nil
392 }
393 n.Valid = true
394 return convertAssign(&n.Time, value)
395 }
396
397
398 func (n NullTime) Value() (driver.Value, error) {
399 if !n.Valid {
400 return nil, nil
401 }
402 return n.Time, nil
403 }
404
405
406
407
408
409
410
411
412
413
414
415
416
417 type Null[T any] struct {
418 V T
419 Valid bool
420 }
421
422 func (n *Null[T]) Scan(value any) error {
423 if value == nil {
424 n.V, n.Valid = *new(T), false
425 return nil
426 }
427 n.Valid = true
428 return convertAssign(&n.V, value)
429 }
430
431 func (n Null[T]) Value() (driver.Value, error) {
432 if !n.Valid {
433 return nil, nil
434 }
435 return n.V, nil
436 }
437
438
439 type Scanner interface {
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458 Scan(src any) error
459 }
460
461
462
463
464
465
466
467
468
469 type Out struct {
470 _NamedFieldsRequired struct{}
471
472
473
474 Dest any
475
476
477
478
479 In bool
480 }
481
482
483
484
485 var ErrNoRows = errors.New("sql: no rows in result set")
486
487
488
489
490
491
492
493
494
495
496
497
498
499 type DB struct {
500
501 waitDuration atomic.Int64
502
503 connector driver.Connector
504
505
506
507 numClosed atomic.Uint64
508
509 mu sync.Mutex
510 freeConn []*driverConn
511 connRequests connRequestSet
512 numOpen int
513
514
515
516
517
518 openerCh chan struct{}
519 closed bool
520 dep map[finalCloser]depSet
521 lastPut map[*driverConn]string
522 maxIdleCount int
523 maxOpen int
524 maxLifetime time.Duration
525 maxIdleTime time.Duration
526 cleanerCh chan struct{}
527 waitCount int64
528 maxIdleClosed int64
529 maxIdleTimeClosed int64
530 maxLifetimeClosed int64
531
532 stop func()
533 }
534
535
536 type connReuseStrategy uint8
537
538 const (
539
540 alwaysNewConn connReuseStrategy = iota
541
542
543
544 cachedOrNewConn
545 )
546
547
548
549
550
551 type driverConn struct {
552 db *DB
553 createdAt time.Time
554
555 sync.Mutex
556 ci driver.Conn
557 needReset bool
558 closed bool
559 finalClosed bool
560 openStmt map[*driverStmt]bool
561
562
563 inUse bool
564 dbmuClosed bool
565 returnedAt time.Time
566 onPut []func()
567 }
568
569 func (dc *driverConn) releaseConn(err error) {
570 dc.db.putConn(dc, err, true)
571 }
572
573 func (dc *driverConn) removeOpenStmt(ds *driverStmt) {
574 dc.Lock()
575 defer dc.Unlock()
576 delete(dc.openStmt, ds)
577 }
578
579 func (dc *driverConn) expired(timeout time.Duration) bool {
580 if timeout <= 0 {
581 return false
582 }
583 return dc.createdAt.Add(timeout).Before(nowFunc())
584 }
585
586
587
588 func (dc *driverConn) resetSession(ctx context.Context) error {
589 dc.Lock()
590 defer dc.Unlock()
591
592 if !dc.needReset {
593 return nil
594 }
595 if cr, ok := dc.ci.(driver.SessionResetter); ok {
596 return cr.ResetSession(ctx)
597 }
598 return nil
599 }
600
601
602
603 func (dc *driverConn) validateConnection(needsReset bool) bool {
604 dc.Lock()
605 defer dc.Unlock()
606
607 if needsReset {
608 dc.needReset = true
609 }
610 if cv, ok := dc.ci.(driver.Validator); ok {
611 return cv.IsValid()
612 }
613 return true
614 }
615
616
617
618 func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
619 si, err := ctxDriverPrepare(ctx, dc.ci, query)
620 if err != nil {
621 return nil, err
622 }
623 ds := &driverStmt{Locker: dc, si: si}
624
625
626 if cg != nil {
627 return ds, nil
628 }
629
630
631
632
633
634 if dc.openStmt == nil {
635 dc.openStmt = make(map[*driverStmt]bool)
636 }
637 dc.openStmt[ds] = true
638 return ds, nil
639 }
640
641
642 func (dc *driverConn) closeDBLocked() func() error {
643 dc.Lock()
644 defer dc.Unlock()
645 if dc.closed {
646 return func() error { return errors.New("sql: duplicate driverConn close") }
647 }
648 dc.closed = true
649 return dc.db.removeDepLocked(dc, dc)
650 }
651
652 func (dc *driverConn) Close() error {
653 dc.Lock()
654 if dc.closed {
655 dc.Unlock()
656 return errors.New("sql: duplicate driverConn close")
657 }
658 dc.closed = true
659 dc.Unlock()
660
661
662 dc.db.mu.Lock()
663 dc.dbmuClosed = true
664 fn := dc.db.removeDepLocked(dc, dc)
665 dc.db.mu.Unlock()
666 return fn()
667 }
668
669 func (dc *driverConn) finalClose() error {
670 var err error
671
672
673
674 var openStmt []*driverStmt
675 withLock(dc, func() {
676 openStmt = make([]*driverStmt, 0, len(dc.openStmt))
677 for ds := range dc.openStmt {
678 openStmt = append(openStmt, ds)
679 }
680 dc.openStmt = nil
681 })
682 for _, ds := range openStmt {
683 ds.Close()
684 }
685 withLock(dc, func() {
686 dc.finalClosed = true
687 err = dc.ci.Close()
688 dc.ci = nil
689 })
690
691 dc.db.mu.Lock()
692 dc.db.numOpen--
693 dc.db.maybeOpenNewConnections()
694 dc.db.mu.Unlock()
695
696 dc.db.numClosed.Add(1)
697 return err
698 }
699
700
701
702
703 type driverStmt struct {
704 sync.Locker
705 si driver.Stmt
706 closed bool
707 closeErr error
708 }
709
710
711
712 func (ds *driverStmt) Close() error {
713 ds.Lock()
714 defer ds.Unlock()
715 if ds.closed {
716 return ds.closeErr
717 }
718 ds.closed = true
719 ds.closeErr = ds.si.Close()
720 return ds.closeErr
721 }
722
723
724 type depSet map[any]bool
725
726
727
728 type finalCloser interface {
729
730
731 finalClose() error
732 }
733
734
735
736 func (db *DB) addDep(x finalCloser, dep any) {
737 db.mu.Lock()
738 defer db.mu.Unlock()
739 db.addDepLocked(x, dep)
740 }
741
742 func (db *DB) addDepLocked(x finalCloser, dep any) {
743 if db.dep == nil {
744 db.dep = make(map[finalCloser]depSet)
745 }
746 xdep := db.dep[x]
747 if xdep == nil {
748 xdep = make(depSet)
749 db.dep[x] = xdep
750 }
751 xdep[dep] = true
752 }
753
754
755
756
757
758 func (db *DB) removeDep(x finalCloser, dep any) error {
759 db.mu.Lock()
760 fn := db.removeDepLocked(x, dep)
761 db.mu.Unlock()
762 return fn()
763 }
764
765 func (db *DB) removeDepLocked(x finalCloser, dep any) func() error {
766 xdep, ok := db.dep[x]
767 if !ok {
768 panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
769 }
770
771 l0 := len(xdep)
772 delete(xdep, dep)
773
774 switch len(xdep) {
775 case l0:
776
777 panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
778 case 0:
779
780 delete(db.dep, x)
781 return x.finalClose
782 default:
783
784 return func() error { return nil }
785 }
786 }
787
788
789
790
791
792
793 var connectionRequestQueueSize = 1000000
794
795 type dsnConnector struct {
796 dsn string
797 driver driver.Driver
798 }
799
800 func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
801 return t.driver.Open(t.dsn)
802 }
803
804 func (t dsnConnector) Driver() driver.Driver {
805 return t.driver
806 }
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824 func OpenDB(c driver.Connector) *DB {
825 ctx, cancel := context.WithCancel(context.Background())
826 db := &DB{
827 connector: c,
828 openerCh: make(chan struct{}, connectionRequestQueueSize),
829 lastPut: make(map[*driverConn]string),
830 stop: cancel,
831 }
832
833 go db.connectionOpener(ctx)
834
835 return db
836 }
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855 func Open(driverName, dataSourceName string) (*DB, error) {
856 driversMu.RLock()
857 driveri, ok := drivers[driverName]
858 driversMu.RUnlock()
859 if !ok {
860 return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
861 }
862
863 if driverCtx, ok := driveri.(driver.DriverContext); ok {
864 connector, err := driverCtx.OpenConnector(dataSourceName)
865 if err != nil {
866 return nil, err
867 }
868 return OpenDB(connector), nil
869 }
870
871 return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
872 }
873
874 func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
875 var err error
876 if pinger, ok := dc.ci.(driver.Pinger); ok {
877 withLock(dc, func() {
878 err = pinger.Ping(ctx)
879 })
880 }
881 release(err)
882 return err
883 }
884
885
886
887 func (db *DB) PingContext(ctx context.Context) error {
888 var dc *driverConn
889 var err error
890
891 err = db.retry(func(strategy connReuseStrategy) error {
892 dc, err = db.conn(ctx, strategy)
893 return err
894 })
895
896 if err != nil {
897 return err
898 }
899
900 return db.pingDC(ctx, dc, dc.releaseConn)
901 }
902
903
904
905
906
907
908 func (db *DB) Ping() error {
909 return db.PingContext(context.Background())
910 }
911
912
913
914
915
916
917
918 func (db *DB) Close() error {
919 db.mu.Lock()
920 if db.closed {
921 db.mu.Unlock()
922 return nil
923 }
924 if db.cleanerCh != nil {
925 close(db.cleanerCh)
926 }
927 var err error
928 fns := make([]func() error, 0, len(db.freeConn))
929 for _, dc := range db.freeConn {
930 fns = append(fns, dc.closeDBLocked())
931 }
932 db.freeConn = nil
933 db.closed = true
934 db.connRequests.CloseAndRemoveAll()
935 db.mu.Unlock()
936 for _, fn := range fns {
937 err1 := fn()
938 if err1 != nil {
939 err = err1
940 }
941 }
942 db.stop()
943 if c, ok := db.connector.(io.Closer); ok {
944 err1 := c.Close()
945 if err1 != nil {
946 err = err1
947 }
948 }
949 return err
950 }
951
952 const defaultMaxIdleConns = 2
953
954 func (db *DB) maxIdleConnsLocked() int {
955 n := db.maxIdleCount
956 switch {
957 case n == 0:
958
959 return defaultMaxIdleConns
960 case n < 0:
961 return 0
962 default:
963 return n
964 }
965 }
966
967 func (db *DB) shortestIdleTimeLocked() time.Duration {
968 if db.maxIdleTime <= 0 {
969 return db.maxLifetime
970 }
971 if db.maxLifetime <= 0 {
972 return db.maxIdleTime
973 }
974 return min(db.maxIdleTime, db.maxLifetime)
975 }
976
977
978
979
980
981
982
983
984
985
986
987 func (db *DB) SetMaxIdleConns(n int) {
988 db.mu.Lock()
989 if n > 0 {
990 db.maxIdleCount = n
991 } else {
992
993 db.maxIdleCount = -1
994 }
995
996 if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
997 db.maxIdleCount = db.maxOpen
998 }
999 var closing []*driverConn
1000 idleCount := len(db.freeConn)
1001 maxIdle := db.maxIdleConnsLocked()
1002 if idleCount > maxIdle {
1003 closing = db.freeConn[maxIdle:]
1004 db.freeConn = db.freeConn[:maxIdle]
1005 }
1006 db.maxIdleClosed += int64(len(closing))
1007 db.mu.Unlock()
1008 for _, c := range closing {
1009 c.Close()
1010 }
1011 }
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021 func (db *DB) SetMaxOpenConns(n int) {
1022 db.mu.Lock()
1023 db.maxOpen = n
1024 if n < 0 {
1025 db.maxOpen = 0
1026 }
1027 syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
1028 db.mu.Unlock()
1029 if syncMaxIdle {
1030 db.SetMaxIdleConns(n)
1031 }
1032 }
1033
1034
1035
1036
1037
1038
1039 func (db *DB) SetConnMaxLifetime(d time.Duration) {
1040 if d < 0 {
1041 d = 0
1042 }
1043 db.mu.Lock()
1044
1045 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
1046 select {
1047 case db.cleanerCh <- struct{}{}:
1048 default:
1049 }
1050 }
1051 db.maxLifetime = d
1052 db.startCleanerLocked()
1053 db.mu.Unlock()
1054 }
1055
1056
1057
1058
1059
1060
1061 func (db *DB) SetConnMaxIdleTime(d time.Duration) {
1062 if d < 0 {
1063 d = 0
1064 }
1065 db.mu.Lock()
1066 defer db.mu.Unlock()
1067
1068
1069 if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil {
1070 select {
1071 case db.cleanerCh <- struct{}{}:
1072 default:
1073 }
1074 }
1075 db.maxIdleTime = d
1076 db.startCleanerLocked()
1077 }
1078
1079
1080 func (db *DB) startCleanerLocked() {
1081 if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
1082 db.cleanerCh = make(chan struct{}, 1)
1083 go db.connectionCleaner(db.shortestIdleTimeLocked())
1084 }
1085 }
1086
1087 func (db *DB) connectionCleaner(d time.Duration) {
1088 const minInterval = time.Second
1089
1090 if d < minInterval {
1091 d = minInterval
1092 }
1093 t := time.NewTimer(d)
1094
1095 for {
1096 select {
1097 case <-t.C:
1098 case <-db.cleanerCh:
1099 }
1100
1101 db.mu.Lock()
1102
1103 d = db.shortestIdleTimeLocked()
1104 if db.closed || db.numOpen == 0 || d <= 0 {
1105 db.cleanerCh = nil
1106 db.mu.Unlock()
1107 return
1108 }
1109
1110 d, closing := db.connectionCleanerRunLocked(d)
1111 db.mu.Unlock()
1112 for _, c := range closing {
1113 c.Close()
1114 }
1115
1116 if d < minInterval {
1117 d = minInterval
1118 }
1119
1120 if !t.Stop() {
1121 select {
1122 case <-t.C:
1123 default:
1124 }
1125 }
1126 t.Reset(d)
1127 }
1128 }
1129
1130
1131
1132
1133 func (db *DB) connectionCleanerRunLocked(d time.Duration) (time.Duration, []*driverConn) {
1134 var idleClosing int64
1135 var closing []*driverConn
1136 if db.maxIdleTime > 0 {
1137
1138
1139 idleSince := nowFunc().Add(-db.maxIdleTime)
1140 last := len(db.freeConn) - 1
1141 for i := last; i >= 0; i-- {
1142 c := db.freeConn[i]
1143 if c.returnedAt.Before(idleSince) {
1144 i++
1145 closing = db.freeConn[:i:i]
1146 db.freeConn = db.freeConn[i:]
1147 idleClosing = int64(len(closing))
1148 db.maxIdleTimeClosed += idleClosing
1149 break
1150 }
1151 }
1152
1153 if len(db.freeConn) > 0 {
1154 c := db.freeConn[0]
1155 if d2 := c.returnedAt.Sub(idleSince); d2 < d {
1156
1157
1158 d = d2
1159 }
1160 }
1161 }
1162
1163 if db.maxLifetime > 0 {
1164 expiredSince := nowFunc().Add(-db.maxLifetime)
1165 for i := 0; i < len(db.freeConn); i++ {
1166 c := db.freeConn[i]
1167 if c.createdAt.Before(expiredSince) {
1168 closing = append(closing, c)
1169
1170 last := len(db.freeConn) - 1
1171
1172
1173 copy(db.freeConn[i:], db.freeConn[i+1:])
1174 db.freeConn[last] = nil
1175 db.freeConn = db.freeConn[:last]
1176 i--
1177 } else if d2 := c.createdAt.Sub(expiredSince); d2 < d {
1178
1179
1180 d = d2
1181 }
1182 }
1183 db.maxLifetimeClosed += int64(len(closing)) - idleClosing
1184 }
1185
1186 return d, closing
1187 }
1188
1189
1190 type DBStats struct {
1191 MaxOpenConnections int
1192
1193
1194 OpenConnections int
1195 InUse int
1196 Idle int
1197
1198
1199 WaitCount int64
1200 WaitDuration time.Duration
1201 MaxIdleClosed int64
1202 MaxIdleTimeClosed int64
1203 MaxLifetimeClosed int64
1204 }
1205
1206
1207 func (db *DB) Stats() DBStats {
1208 wait := db.waitDuration.Load()
1209
1210 db.mu.Lock()
1211 defer db.mu.Unlock()
1212
1213 stats := DBStats{
1214 MaxOpenConnections: db.maxOpen,
1215
1216 Idle: len(db.freeConn),
1217 OpenConnections: db.numOpen,
1218 InUse: db.numOpen - len(db.freeConn),
1219
1220 WaitCount: db.waitCount,
1221 WaitDuration: time.Duration(wait),
1222 MaxIdleClosed: db.maxIdleClosed,
1223 MaxIdleTimeClosed: db.maxIdleTimeClosed,
1224 MaxLifetimeClosed: db.maxLifetimeClosed,
1225 }
1226 return stats
1227 }
1228
1229
1230
1231
1232 func (db *DB) maybeOpenNewConnections() {
1233 numRequests := db.connRequests.Len()
1234 if db.maxOpen > 0 {
1235 numCanOpen := db.maxOpen - db.numOpen
1236 if numRequests > numCanOpen {
1237 numRequests = numCanOpen
1238 }
1239 }
1240 for numRequests > 0 {
1241 db.numOpen++
1242 numRequests--
1243 if db.closed {
1244 return
1245 }
1246 db.openerCh <- struct{}{}
1247 }
1248 }
1249
1250
1251 func (db *DB) connectionOpener(ctx context.Context) {
1252 for {
1253 select {
1254 case <-ctx.Done():
1255 return
1256 case <-db.openerCh:
1257 db.openNewConnection(ctx)
1258 }
1259 }
1260 }
1261
1262
1263 func (db *DB) openNewConnection(ctx context.Context) {
1264
1265
1266
1267 ci, err := db.connector.Connect(ctx)
1268 db.mu.Lock()
1269 defer db.mu.Unlock()
1270 if db.closed {
1271 if err == nil {
1272 ci.Close()
1273 }
1274 db.numOpen--
1275 return
1276 }
1277 if err != nil {
1278 db.numOpen--
1279 db.putConnDBLocked(nil, err)
1280 db.maybeOpenNewConnections()
1281 return
1282 }
1283 dc := &driverConn{
1284 db: db,
1285 createdAt: nowFunc(),
1286 returnedAt: nowFunc(),
1287 ci: ci,
1288 }
1289 if db.putConnDBLocked(dc, err) {
1290 db.addDepLocked(dc, dc)
1291 } else {
1292 db.numOpen--
1293 ci.Close()
1294 }
1295 }
1296
1297
1298
1299
1300 type connRequest struct {
1301 conn *driverConn
1302 err error
1303 }
1304
1305 var errDBClosed = errors.New("sql: database is closed")
1306
1307
1308 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
1309 db.mu.Lock()
1310 if db.closed {
1311 db.mu.Unlock()
1312 return nil, errDBClosed
1313 }
1314
1315 select {
1316 default:
1317 case <-ctx.Done():
1318 db.mu.Unlock()
1319 return nil, ctx.Err()
1320 }
1321 lifetime := db.maxLifetime
1322
1323
1324 last := len(db.freeConn) - 1
1325 if strategy == cachedOrNewConn && last >= 0 {
1326
1327
1328 conn := db.freeConn[last]
1329 db.freeConn = db.freeConn[:last]
1330 conn.inUse = true
1331 if conn.expired(lifetime) {
1332 db.maxLifetimeClosed++
1333 db.mu.Unlock()
1334 conn.Close()
1335 return nil, driver.ErrBadConn
1336 }
1337 db.mu.Unlock()
1338
1339
1340 if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
1341 conn.Close()
1342 return nil, err
1343 }
1344
1345 return conn, nil
1346 }
1347
1348
1349
1350 if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
1351
1352
1353 req := make(chan connRequest, 1)
1354 delHandle := db.connRequests.Add(req)
1355 db.waitCount++
1356 db.mu.Unlock()
1357
1358 waitStart := nowFunc()
1359
1360
1361 select {
1362 case <-ctx.Done():
1363
1364
1365 db.mu.Lock()
1366 deleted := db.connRequests.Delete(delHandle)
1367 db.mu.Unlock()
1368
1369 db.waitDuration.Add(int64(time.Since(waitStart)))
1370
1371
1372
1373 if !deleted {
1374
1375
1376
1377
1378
1379
1380 select {
1381 default:
1382 case ret, ok := <-req:
1383 if ok && ret.conn != nil {
1384 db.putConn(ret.conn, ret.err, false)
1385 }
1386 }
1387 }
1388 return nil, ctx.Err()
1389 case ret, ok := <-req:
1390 db.waitDuration.Add(int64(time.Since(waitStart)))
1391
1392 if !ok {
1393 return nil, errDBClosed
1394 }
1395
1396
1397
1398
1399
1400
1401 if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
1402 db.mu.Lock()
1403 db.maxLifetimeClosed++
1404 db.mu.Unlock()
1405 ret.conn.Close()
1406 return nil, driver.ErrBadConn
1407 }
1408 if ret.conn == nil {
1409 return nil, ret.err
1410 }
1411
1412
1413 if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
1414 ret.conn.Close()
1415 return nil, err
1416 }
1417 return ret.conn, ret.err
1418 }
1419 }
1420
1421 db.numOpen++
1422 db.mu.Unlock()
1423 ci, err := db.connector.Connect(ctx)
1424 if err != nil {
1425 db.mu.Lock()
1426 db.numOpen--
1427 db.maybeOpenNewConnections()
1428 db.mu.Unlock()
1429 return nil, err
1430 }
1431 db.mu.Lock()
1432 dc := &driverConn{
1433 db: db,
1434 createdAt: nowFunc(),
1435 returnedAt: nowFunc(),
1436 ci: ci,
1437 inUse: true,
1438 }
1439 db.addDepLocked(dc, dc)
1440 db.mu.Unlock()
1441 return dc, nil
1442 }
1443
1444
1445 var putConnHook func(*DB, *driverConn)
1446
1447
1448
1449
1450 func (db *DB) noteUnusedDriverStatement(c *driverConn, ds *driverStmt) {
1451 db.mu.Lock()
1452 defer db.mu.Unlock()
1453 if c.inUse {
1454 c.onPut = append(c.onPut, func() {
1455 ds.Close()
1456 })
1457 } else {
1458 c.Lock()
1459 fc := c.finalClosed
1460 c.Unlock()
1461 if !fc {
1462 ds.Close()
1463 }
1464 }
1465 }
1466
1467
1468
1469 const debugGetPut = false
1470
1471
1472
1473 func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
1474 if !errors.Is(err, driver.ErrBadConn) {
1475 if !dc.validateConnection(resetSession) {
1476 err = driver.ErrBadConn
1477 }
1478 }
1479 db.mu.Lock()
1480 if !dc.inUse {
1481 db.mu.Unlock()
1482 if debugGetPut {
1483 fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
1484 }
1485 panic("sql: connection returned that was never out")
1486 }
1487
1488 if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
1489 db.maxLifetimeClosed++
1490 err = driver.ErrBadConn
1491 }
1492 if debugGetPut {
1493 db.lastPut[dc] = stack()
1494 }
1495 dc.inUse = false
1496 dc.returnedAt = nowFunc()
1497
1498 for _, fn := range dc.onPut {
1499 fn()
1500 }
1501 dc.onPut = nil
1502
1503 if errors.Is(err, driver.ErrBadConn) {
1504
1505
1506
1507
1508 db.maybeOpenNewConnections()
1509 db.mu.Unlock()
1510 dc.Close()
1511 return
1512 }
1513 if putConnHook != nil {
1514 putConnHook(db, dc)
1515 }
1516 added := db.putConnDBLocked(dc, nil)
1517 db.mu.Unlock()
1518
1519 if !added {
1520 dc.Close()
1521 return
1522 }
1523 }
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
1535 if db.closed {
1536 return false
1537 }
1538 if db.maxOpen > 0 && db.numOpen > db.maxOpen {
1539 return false
1540 }
1541 if req, ok := db.connRequests.TakeRandom(); ok {
1542 if err == nil {
1543 dc.inUse = true
1544 }
1545 req <- connRequest{
1546 conn: dc,
1547 err: err,
1548 }
1549 return true
1550 } else if err == nil && !db.closed {
1551 if db.maxIdleConnsLocked() > len(db.freeConn) {
1552 db.freeConn = append(db.freeConn, dc)
1553 db.startCleanerLocked()
1554 return true
1555 }
1556 db.maxIdleClosed++
1557 }
1558 return false
1559 }
1560
1561
1562
1563
1564 const maxBadConnRetries = 2
1565
1566 func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {
1567 for i := int64(0); i < maxBadConnRetries; i++ {
1568 err := fn(cachedOrNewConn)
1569
1570 if err == nil || !errors.Is(err, driver.ErrBadConn) {
1571 return err
1572 }
1573 }
1574
1575 return fn(alwaysNewConn)
1576 }
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586 func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1587 var stmt *Stmt
1588 var err error
1589
1590 err = db.retry(func(strategy connReuseStrategy) error {
1591 stmt, err = db.prepare(ctx, query, strategy)
1592 return err
1593 })
1594
1595 return stmt, err
1596 }
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606 func (db *DB) Prepare(query string) (*Stmt, error) {
1607 return db.PrepareContext(context.Background(), query)
1608 }
1609
1610 func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
1611
1612
1613
1614
1615
1616
1617 dc, err := db.conn(ctx, strategy)
1618 if err != nil {
1619 return nil, err
1620 }
1621 return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
1622 }
1623
1624
1625
1626
1627 func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
1628 var ds *driverStmt
1629 var err error
1630 defer func() {
1631 release(err)
1632 }()
1633 withLock(dc, func() {
1634 ds, err = dc.prepareLocked(ctx, cg, query)
1635 })
1636 if err != nil {
1637 return nil, err
1638 }
1639 stmt := &Stmt{
1640 db: db,
1641 query: query,
1642 cg: cg,
1643 cgds: ds,
1644 }
1645
1646
1647
1648
1649 if cg == nil {
1650 stmt.css = []connStmt{{dc, ds}}
1651 stmt.lastNumClosed = db.numClosed.Load()
1652 db.addDep(stmt, stmt)
1653 }
1654 return stmt, nil
1655 }
1656
1657
1658
1659 func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
1660 var res Result
1661 var err error
1662
1663 err = db.retry(func(strategy connReuseStrategy) error {
1664 res, err = db.exec(ctx, query, args, strategy)
1665 return err
1666 })
1667
1668 return res, err
1669 }
1670
1671
1672
1673
1674
1675
1676 func (db *DB) Exec(query string, args ...any) (Result, error) {
1677 return db.ExecContext(context.Background(), query, args...)
1678 }
1679
1680 func (db *DB) exec(ctx context.Context, query string, args []any, strategy connReuseStrategy) (Result, error) {
1681 dc, err := db.conn(ctx, strategy)
1682 if err != nil {
1683 return nil, err
1684 }
1685 return db.execDC(ctx, dc, dc.releaseConn, query, args)
1686 }
1687
1688 func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []any) (res Result, err error) {
1689 defer func() {
1690 release(err)
1691 }()
1692 execerCtx, ok := dc.ci.(driver.ExecerContext)
1693 var execer driver.Execer
1694 if !ok {
1695 execer, ok = dc.ci.(driver.Execer)
1696 }
1697 if ok {
1698 var nvdargs []driver.NamedValue
1699 var resi driver.Result
1700 withLock(dc, func() {
1701 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1702 if err != nil {
1703 return
1704 }
1705 resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
1706 })
1707 if err != driver.ErrSkip {
1708 if err != nil {
1709 return nil, err
1710 }
1711 return driverResult{dc, resi}, nil
1712 }
1713 }
1714
1715 var si driver.Stmt
1716 withLock(dc, func() {
1717 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1718 })
1719 if err != nil {
1720 return nil, err
1721 }
1722 ds := &driverStmt{Locker: dc, si: si}
1723 defer ds.Close()
1724 return resultFromStatement(ctx, dc.ci, ds, args...)
1725 }
1726
1727
1728
1729 func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
1730 var rows *Rows
1731 var err error
1732
1733 err = db.retry(func(strategy connReuseStrategy) error {
1734 rows, err = db.query(ctx, query, args, strategy)
1735 return err
1736 })
1737
1738 return rows, err
1739 }
1740
1741
1742
1743
1744
1745
1746 func (db *DB) Query(query string, args ...any) (*Rows, error) {
1747 return db.QueryContext(context.Background(), query, args...)
1748 }
1749
1750 func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
1751 dc, err := db.conn(ctx, strategy)
1752 if err != nil {
1753 return nil, err
1754 }
1755
1756 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
1757 }
1758
1759
1760
1761
1762
1763 func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []any) (*Rows, error) {
1764 queryerCtx, ok := dc.ci.(driver.QueryerContext)
1765 var queryer driver.Queryer
1766 if !ok {
1767 queryer, ok = dc.ci.(driver.Queryer)
1768 }
1769 if ok {
1770 var nvdargs []driver.NamedValue
1771 var rowsi driver.Rows
1772 var err error
1773 withLock(dc, func() {
1774 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1775 if err != nil {
1776 return
1777 }
1778 rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
1779 })
1780 if err != driver.ErrSkip {
1781 if err != nil {
1782 releaseConn(err)
1783 return nil, err
1784 }
1785
1786
1787 rows := &Rows{
1788 dc: dc,
1789 releaseConn: releaseConn,
1790 rowsi: rowsi,
1791 }
1792 rows.initContextClose(ctx, txctx)
1793 return rows, nil
1794 }
1795 }
1796
1797 var si driver.Stmt
1798 var err error
1799 withLock(dc, func() {
1800 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1801 })
1802 if err != nil {
1803 releaseConn(err)
1804 return nil, err
1805 }
1806
1807 ds := &driverStmt{Locker: dc, si: si}
1808 rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
1809 if err != nil {
1810 ds.Close()
1811 releaseConn(err)
1812 return nil, err
1813 }
1814
1815
1816
1817 rows := &Rows{
1818 dc: dc,
1819 releaseConn: releaseConn,
1820 rowsi: rowsi,
1821 closeStmt: ds,
1822 }
1823 rows.initContextClose(ctx, txctx)
1824 return rows, nil
1825 }
1826
1827
1828
1829
1830
1831
1832
1833 func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
1834 rows, err := db.QueryContext(ctx, query, args...)
1835 return &Row{rows: rows, err: err}
1836 }
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847 func (db *DB) QueryRow(query string, args ...any) *Row {
1848 return db.QueryRowContext(context.Background(), query, args...)
1849 }
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861 func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1862 var tx *Tx
1863 var err error
1864
1865 err = db.retry(func(strategy connReuseStrategy) error {
1866 tx, err = db.begin(ctx, opts, strategy)
1867 return err
1868 })
1869
1870 return tx, err
1871 }
1872
1873
1874
1875
1876
1877
1878 func (db *DB) Begin() (*Tx, error) {
1879 return db.BeginTx(context.Background(), nil)
1880 }
1881
1882 func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
1883 dc, err := db.conn(ctx, strategy)
1884 if err != nil {
1885 return nil, err
1886 }
1887 return db.beginDC(ctx, dc, dc.releaseConn, opts)
1888 }
1889
1890
1891 func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
1892 var txi driver.Tx
1893 keepConnOnRollback := false
1894 withLock(dc, func() {
1895 _, hasSessionResetter := dc.ci.(driver.SessionResetter)
1896 _, hasConnectionValidator := dc.ci.(driver.Validator)
1897 keepConnOnRollback = hasSessionResetter && hasConnectionValidator
1898 txi, err = ctxDriverBegin(ctx, opts, dc.ci)
1899 })
1900 if err != nil {
1901 release(err)
1902 return nil, err
1903 }
1904
1905
1906
1907 ctx, cancel := context.WithCancel(ctx)
1908 tx = &Tx{
1909 db: db,
1910 dc: dc,
1911 releaseConn: release,
1912 txi: txi,
1913 cancel: cancel,
1914 keepConnOnRollback: keepConnOnRollback,
1915 ctx: ctx,
1916 }
1917 go tx.awaitDone()
1918 return tx, nil
1919 }
1920
1921
1922 func (db *DB) Driver() driver.Driver {
1923 return db.connector.Driver()
1924 }
1925
1926
1927
1928 var ErrConnDone = errors.New("sql: connection is already closed")
1929
1930
1931
1932
1933
1934
1935
1936
1937 func (db *DB) Conn(ctx context.Context) (*Conn, error) {
1938 var dc *driverConn
1939 var err error
1940
1941 err = db.retry(func(strategy connReuseStrategy) error {
1942 dc, err = db.conn(ctx, strategy)
1943 return err
1944 })
1945
1946 if err != nil {
1947 return nil, err
1948 }
1949
1950 conn := &Conn{
1951 db: db,
1952 dc: dc,
1953 }
1954 return conn, nil
1955 }
1956
1957 type releaseConn func(error)
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968 type Conn struct {
1969 db *DB
1970
1971
1972
1973
1974 closemu sync.RWMutex
1975
1976
1977
1978 dc *driverConn
1979
1980
1981
1982 done atomic.Bool
1983
1984 releaseConnOnce sync.Once
1985
1986
1987 releaseConnCache releaseConn
1988 }
1989
1990
1991
1992 func (c *Conn) grabConn(context.Context) (*driverConn, releaseConn, error) {
1993 if c.done.Load() {
1994 return nil, nil, ErrConnDone
1995 }
1996 c.releaseConnOnce.Do(func() {
1997 c.releaseConnCache = c.closemuRUnlockCondReleaseConn
1998 })
1999 c.closemu.RLock()
2000 return c.dc, c.releaseConnCache, nil
2001 }
2002
2003
2004 func (c *Conn) PingContext(ctx context.Context) error {
2005 dc, release, err := c.grabConn(ctx)
2006 if err != nil {
2007 return err
2008 }
2009 return c.db.pingDC(ctx, dc, release)
2010 }
2011
2012
2013
2014 func (c *Conn) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
2015 dc, release, err := c.grabConn(ctx)
2016 if err != nil {
2017 return nil, err
2018 }
2019 return c.db.execDC(ctx, dc, release, query, args)
2020 }
2021
2022
2023
2024 func (c *Conn) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
2025 dc, release, err := c.grabConn(ctx)
2026 if err != nil {
2027 return nil, err
2028 }
2029 return c.db.queryDC(ctx, nil, dc, release, query, args)
2030 }
2031
2032
2033
2034
2035
2036
2037
2038 func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
2039 rows, err := c.QueryContext(ctx, query, args...)
2040 return &Row{rows: rows, err: err}
2041 }
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051 func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2052 dc, release, err := c.grabConn(ctx)
2053 if err != nil {
2054 return nil, err
2055 }
2056 return c.db.prepareDC(ctx, dc, release, c, query)
2057 }
2058
2059
2060
2061
2062
2063
2064 func (c *Conn) Raw(f func(driverConn any) error) (err error) {
2065 var dc *driverConn
2066 var release releaseConn
2067
2068
2069 dc, release, err = c.grabConn(nil)
2070 if err != nil {
2071 return
2072 }
2073 fPanic := true
2074 dc.Mutex.Lock()
2075 defer func() {
2076 dc.Mutex.Unlock()
2077
2078
2079
2080
2081 if fPanic {
2082 err = driver.ErrBadConn
2083 }
2084 release(err)
2085 }()
2086 err = f(dc.ci)
2087 fPanic = false
2088
2089 return
2090 }
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102 func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
2103 dc, release, err := c.grabConn(ctx)
2104 if err != nil {
2105 return nil, err
2106 }
2107 return c.db.beginDC(ctx, dc, release, opts)
2108 }
2109
2110
2111
2112 func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
2113 c.closemu.RUnlock()
2114 if errors.Is(err, driver.ErrBadConn) {
2115 c.close(err)
2116 }
2117 }
2118
2119 func (c *Conn) txCtx() context.Context {
2120 return nil
2121 }
2122
2123 func (c *Conn) close(err error) error {
2124 if !c.done.CompareAndSwap(false, true) {
2125 return ErrConnDone
2126 }
2127
2128
2129
2130 c.closemu.Lock()
2131 defer c.closemu.Unlock()
2132
2133 c.dc.releaseConn(err)
2134 c.dc = nil
2135 c.db = nil
2136 return err
2137 }
2138
2139
2140
2141
2142
2143
2144 func (c *Conn) Close() error {
2145 return c.close(nil)
2146 }
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158 type Tx struct {
2159 db *DB
2160
2161
2162
2163
2164 closemu sync.RWMutex
2165
2166
2167
2168 dc *driverConn
2169 txi driver.Tx
2170
2171
2172
2173 releaseConn func(error)
2174
2175
2176
2177
2178 done atomic.Bool
2179
2180
2181
2182
2183 keepConnOnRollback bool
2184
2185
2186
2187 stmts struct {
2188 sync.Mutex
2189 v []*Stmt
2190 }
2191
2192
2193 cancel func()
2194
2195
2196 ctx context.Context
2197 }
2198
2199
2200
2201 func (tx *Tx) awaitDone() {
2202
2203
2204 <-tx.ctx.Done()
2205
2206
2207
2208
2209
2210
2211
2212 discardConnection := !tx.keepConnOnRollback
2213 tx.rollback(discardConnection)
2214 }
2215
2216 func (tx *Tx) isDone() bool {
2217 return tx.done.Load()
2218 }
2219
2220
2221
2222 var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
2223
2224
2225
2226
2227 func (tx *Tx) close(err error) {
2228 tx.releaseConn(err)
2229 tx.dc = nil
2230 tx.txi = nil
2231 }
2232
2233
2234
2235 var hookTxGrabConn func()
2236
2237 func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
2238 select {
2239 default:
2240 case <-ctx.Done():
2241 return nil, nil, ctx.Err()
2242 }
2243
2244
2245
2246 tx.closemu.RLock()
2247 if tx.isDone() {
2248 tx.closemu.RUnlock()
2249 return nil, nil, ErrTxDone
2250 }
2251 if hookTxGrabConn != nil {
2252 hookTxGrabConn()
2253 }
2254 return tx.dc, tx.closemuRUnlockRelease, nil
2255 }
2256
2257 func (tx *Tx) txCtx() context.Context {
2258 return tx.ctx
2259 }
2260
2261
2262
2263
2264
2265 func (tx *Tx) closemuRUnlockRelease(error) {
2266 tx.closemu.RUnlock()
2267 }
2268
2269
2270 func (tx *Tx) closePrepared() {
2271 tx.stmts.Lock()
2272 defer tx.stmts.Unlock()
2273 for _, stmt := range tx.stmts.v {
2274 stmt.Close()
2275 }
2276 }
2277
2278
2279 func (tx *Tx) Commit() error {
2280
2281
2282
2283 select {
2284 default:
2285 case <-tx.ctx.Done():
2286 if tx.done.Load() {
2287 return ErrTxDone
2288 }
2289 return tx.ctx.Err()
2290 }
2291 if !tx.done.CompareAndSwap(false, true) {
2292 return ErrTxDone
2293 }
2294
2295
2296
2297
2298
2299 tx.cancel()
2300 tx.closemu.Lock()
2301 tx.closemu.Unlock()
2302
2303 var err error
2304 withLock(tx.dc, func() {
2305 err = tx.txi.Commit()
2306 })
2307 if !errors.Is(err, driver.ErrBadConn) {
2308 tx.closePrepared()
2309 }
2310 tx.close(err)
2311 return err
2312 }
2313
2314 var rollbackHook func()
2315
2316
2317
2318 func (tx *Tx) rollback(discardConn bool) error {
2319 if !tx.done.CompareAndSwap(false, true) {
2320 return ErrTxDone
2321 }
2322
2323 if rollbackHook != nil {
2324 rollbackHook()
2325 }
2326
2327
2328
2329
2330
2331 tx.cancel()
2332 tx.closemu.Lock()
2333 tx.closemu.Unlock()
2334
2335 var err error
2336 withLock(tx.dc, func() {
2337 err = tx.txi.Rollback()
2338 })
2339 if !errors.Is(err, driver.ErrBadConn) {
2340 tx.closePrepared()
2341 }
2342 if discardConn {
2343 err = driver.ErrBadConn
2344 }
2345 tx.close(err)
2346 return err
2347 }
2348
2349
2350 func (tx *Tx) Rollback() error {
2351 return tx.rollback(false)
2352 }
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364 func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2365 dc, release, err := tx.grabConn(ctx)
2366 if err != nil {
2367 return nil, err
2368 }
2369
2370 stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
2371 if err != nil {
2372 return nil, err
2373 }
2374 tx.stmts.Lock()
2375 tx.stmts.v = append(tx.stmts.v, stmt)
2376 tx.stmts.Unlock()
2377 return stmt, nil
2378 }
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389 func (tx *Tx) Prepare(query string) (*Stmt, error) {
2390 return tx.PrepareContext(context.Background(), query)
2391 }
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409 func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
2410 dc, release, err := tx.grabConn(ctx)
2411 if err != nil {
2412 return &Stmt{stickyErr: err}
2413 }
2414 defer release(nil)
2415
2416 if tx.db != stmt.db {
2417 return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
2418 }
2419 var si driver.Stmt
2420 var parentStmt *Stmt
2421 stmt.mu.Lock()
2422 if stmt.closed || stmt.cg != nil {
2423
2424
2425
2426
2427
2428
2429 stmt.mu.Unlock()
2430 withLock(dc, func() {
2431 si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
2432 })
2433 if err != nil {
2434 return &Stmt{stickyErr: err}
2435 }
2436 } else {
2437 stmt.removeClosedStmtLocked()
2438
2439
2440 for _, v := range stmt.css {
2441 if v.dc == dc {
2442 si = v.ds.si
2443 break
2444 }
2445 }
2446
2447 stmt.mu.Unlock()
2448
2449 if si == nil {
2450 var ds *driverStmt
2451 withLock(dc, func() {
2452 ds, err = stmt.prepareOnConnLocked(ctx, dc)
2453 })
2454 if err != nil {
2455 return &Stmt{stickyErr: err}
2456 }
2457 si = ds.si
2458 }
2459 parentStmt = stmt
2460 }
2461
2462 txs := &Stmt{
2463 db: tx.db,
2464 cg: tx,
2465 cgds: &driverStmt{
2466 Locker: dc,
2467 si: si,
2468 },
2469 parentStmt: parentStmt,
2470 query: stmt.query,
2471 }
2472 if parentStmt != nil {
2473 tx.db.addDep(parentStmt, txs)
2474 }
2475 tx.stmts.Lock()
2476 tx.stmts.v = append(tx.stmts.v, txs)
2477 tx.stmts.Unlock()
2478 return txs
2479 }
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497 func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
2498 return tx.StmtContext(context.Background(), stmt)
2499 }
2500
2501
2502
2503 func (tx *Tx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
2504 dc, release, err := tx.grabConn(ctx)
2505 if err != nil {
2506 return nil, err
2507 }
2508 return tx.db.execDC(ctx, dc, release, query, args)
2509 }
2510
2511
2512
2513
2514
2515
2516 func (tx *Tx) Exec(query string, args ...any) (Result, error) {
2517 return tx.ExecContext(context.Background(), query, args...)
2518 }
2519
2520
2521 func (tx *Tx) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
2522 dc, release, err := tx.grabConn(ctx)
2523 if err != nil {
2524 return nil, err
2525 }
2526
2527 return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
2528 }
2529
2530
2531
2532
2533
2534 func (tx *Tx) Query(query string, args ...any) (*Rows, error) {
2535 return tx.QueryContext(context.Background(), query, args...)
2536 }
2537
2538
2539
2540
2541
2542
2543
2544 func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
2545 rows, err := tx.QueryContext(ctx, query, args...)
2546 return &Row{rows: rows, err: err}
2547 }
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558 func (tx *Tx) QueryRow(query string, args ...any) *Row {
2559 return tx.QueryRowContext(context.Background(), query, args...)
2560 }
2561
2562
2563 type connStmt struct {
2564 dc *driverConn
2565 ds *driverStmt
2566 }
2567
2568
2569
2570 type stmtConnGrabber interface {
2571
2572
2573 grabConn(context.Context) (*driverConn, releaseConn, error)
2574
2575
2576
2577
2578 txCtx() context.Context
2579 }
2580
2581 var (
2582 _ stmtConnGrabber = &Tx{}
2583 _ stmtConnGrabber = &Conn{}
2584 )
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595 type Stmt struct {
2596
2597 db *DB
2598 query string
2599 stickyErr error
2600
2601 closemu sync.RWMutex
2602
2603
2604
2605
2606
2607
2608 cg stmtConnGrabber
2609 cgds *driverStmt
2610
2611
2612
2613
2614
2615
2616
2617 parentStmt *Stmt
2618
2619 mu sync.Mutex
2620 closed bool
2621
2622
2623
2624
2625
2626 css []connStmt
2627
2628
2629
2630 lastNumClosed uint64
2631 }
2632
2633
2634
2635 func (s *Stmt) ExecContext(ctx context.Context, args ...any) (Result, error) {
2636 s.closemu.RLock()
2637 defer s.closemu.RUnlock()
2638
2639 var res Result
2640 err := s.db.retry(func(strategy connReuseStrategy) error {
2641 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2642 if err != nil {
2643 return err
2644 }
2645
2646 res, err = resultFromStatement(ctx, dc.ci, ds, args...)
2647 releaseConn(err)
2648 return err
2649 })
2650
2651 return res, err
2652 }
2653
2654
2655
2656
2657
2658
2659 func (s *Stmt) Exec(args ...any) (Result, error) {
2660 return s.ExecContext(context.Background(), args...)
2661 }
2662
2663 func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (Result, error) {
2664 ds.Lock()
2665 defer ds.Unlock()
2666
2667 dargs, err := driverArgsConnLocked(ci, ds, args)
2668 if err != nil {
2669 return nil, err
2670 }
2671
2672 resi, err := ctxDriverStmtExec(ctx, ds.si, dargs)
2673 if err != nil {
2674 return nil, err
2675 }
2676 return driverResult{ds.Locker, resi}, nil
2677 }
2678
2679
2680
2681
2682
2683 func (s *Stmt) removeClosedStmtLocked() {
2684 t := len(s.css)/2 + 1
2685 if t > 10 {
2686 t = 10
2687 }
2688 dbClosed := s.db.numClosed.Load()
2689 if dbClosed-s.lastNumClosed < uint64(t) {
2690 return
2691 }
2692
2693 s.db.mu.Lock()
2694 for i := 0; i < len(s.css); i++ {
2695 if s.css[i].dc.dbmuClosed {
2696 s.css[i] = s.css[len(s.css)-1]
2697
2698 s.css[len(s.css)-1] = connStmt{}
2699 s.css = s.css[:len(s.css)-1]
2700 i--
2701 }
2702 }
2703 s.db.mu.Unlock()
2704 s.lastNumClosed = dbClosed
2705 }
2706
2707
2708
2709
2710 func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
2711 if err = s.stickyErr; err != nil {
2712 return
2713 }
2714 s.mu.Lock()
2715 if s.closed {
2716 s.mu.Unlock()
2717 err = errors.New("sql: statement is closed")
2718 return
2719 }
2720
2721
2722
2723 if s.cg != nil {
2724 s.mu.Unlock()
2725 dc, releaseConn, err = s.cg.grabConn(ctx)
2726 if err != nil {
2727 return
2728 }
2729 return dc, releaseConn, s.cgds, nil
2730 }
2731
2732 s.removeClosedStmtLocked()
2733 s.mu.Unlock()
2734
2735 dc, err = s.db.conn(ctx, strategy)
2736 if err != nil {
2737 return nil, nil, nil, err
2738 }
2739
2740 s.mu.Lock()
2741 for _, v := range s.css {
2742 if v.dc == dc {
2743 s.mu.Unlock()
2744 return dc, dc.releaseConn, v.ds, nil
2745 }
2746 }
2747 s.mu.Unlock()
2748
2749
2750 withLock(dc, func() {
2751 ds, err = s.prepareOnConnLocked(ctx, dc)
2752 })
2753 if err != nil {
2754 dc.releaseConn(err)
2755 return nil, nil, nil, err
2756 }
2757
2758 return dc, dc.releaseConn, ds, nil
2759 }
2760
2761
2762
2763 func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) {
2764 si, err := dc.prepareLocked(ctx, s.cg, s.query)
2765 if err != nil {
2766 return nil, err
2767 }
2768 cs := connStmt{dc, si}
2769 s.mu.Lock()
2770 s.css = append(s.css, cs)
2771 s.mu.Unlock()
2772 return cs.ds, nil
2773 }
2774
2775
2776
2777 func (s *Stmt) QueryContext(ctx context.Context, args ...any) (*Rows, error) {
2778 s.closemu.RLock()
2779 defer s.closemu.RUnlock()
2780
2781 var rowsi driver.Rows
2782 var rows *Rows
2783
2784 err := s.db.retry(func(strategy connReuseStrategy) error {
2785 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2786 if err != nil {
2787 return err
2788 }
2789
2790 rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...)
2791 if err == nil {
2792
2793
2794 rows = &Rows{
2795 dc: dc,
2796 rowsi: rowsi,
2797
2798 }
2799
2800
2801 s.db.addDep(s, rows)
2802
2803
2804
2805 rows.releaseConn = func(err error) {
2806 releaseConn(err)
2807 s.db.removeDep(s, rows)
2808 }
2809 var txctx context.Context
2810 if s.cg != nil {
2811 txctx = s.cg.txCtx()
2812 }
2813 rows.initContextClose(ctx, txctx)
2814 return nil
2815 }
2816
2817 releaseConn(err)
2818 return err
2819 })
2820
2821 return rows, err
2822 }
2823
2824
2825
2826
2827
2828
2829 func (s *Stmt) Query(args ...any) (*Rows, error) {
2830 return s.QueryContext(context.Background(), args...)
2831 }
2832
2833 func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (driver.Rows, error) {
2834 ds.Lock()
2835 defer ds.Unlock()
2836 dargs, err := driverArgsConnLocked(ci, ds, args)
2837 if err != nil {
2838 return nil, err
2839 }
2840 return ctxDriverStmtQuery(ctx, ds.si, dargs)
2841 }
2842
2843
2844
2845
2846
2847
2848
2849 func (s *Stmt) QueryRowContext(ctx context.Context, args ...any) *Row {
2850 rows, err := s.QueryContext(ctx, args...)
2851 if err != nil {
2852 return &Row{err: err}
2853 }
2854 return &Row{rows: rows}
2855 }
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871 func (s *Stmt) QueryRow(args ...any) *Row {
2872 return s.QueryRowContext(context.Background(), args...)
2873 }
2874
2875
2876 func (s *Stmt) Close() error {
2877 s.closemu.Lock()
2878 defer s.closemu.Unlock()
2879
2880 if s.stickyErr != nil {
2881 return s.stickyErr
2882 }
2883 s.mu.Lock()
2884 if s.closed {
2885 s.mu.Unlock()
2886 return nil
2887 }
2888 s.closed = true
2889 txds := s.cgds
2890 s.cgds = nil
2891
2892 s.mu.Unlock()
2893
2894 if s.cg == nil {
2895 return s.db.removeDep(s, s)
2896 }
2897
2898 if s.parentStmt != nil {
2899
2900
2901 return s.db.removeDep(s.parentStmt, s)
2902 }
2903 return txds.Close()
2904 }
2905
2906 func (s *Stmt) finalClose() error {
2907 s.mu.Lock()
2908 defer s.mu.Unlock()
2909 if s.css != nil {
2910 for _, v := range s.css {
2911 s.db.noteUnusedDriverStatement(v.dc, v.ds)
2912 v.dc.removeOpenStmt(v.ds)
2913 }
2914 s.css = nil
2915 }
2916 return nil
2917 }
2918
2919
2920
2921 type Rows struct {
2922 dc *driverConn
2923 releaseConn func(error)
2924 rowsi driver.Rows
2925 cancel func()
2926 closeStmt *driverStmt
2927
2928 contextDone atomic.Pointer[error]
2929
2930
2931
2932
2933
2934
2935 closemu sync.RWMutex
2936 lasterr error
2937 closed bool
2938
2939
2940
2941
2942
2943
2944
2945
2946 closemuScanHold bool
2947
2948
2949
2950
2951
2952 hitEOF bool
2953
2954
2955
2956 lastcols []driver.Value
2957
2958
2959
2960
2961
2962
2963 raw []byte
2964 }
2965
2966
2967
2968 func (rs *Rows) lasterrOrErrLocked(err error) error {
2969 if rs.lasterr != nil && rs.lasterr != io.EOF {
2970 return rs.lasterr
2971 }
2972 return err
2973 }
2974
2975
2976
2977 var bypassRowsAwaitDone = false
2978
2979 func (rs *Rows) initContextClose(ctx, txctx context.Context) {
2980 if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) {
2981 return
2982 }
2983 if bypassRowsAwaitDone {
2984 return
2985 }
2986 closectx, cancel := context.WithCancel(ctx)
2987 rs.cancel = cancel
2988 go rs.awaitDone(ctx, txctx, closectx)
2989 }
2990
2991
2992
2993
2994
2995
2996 func (rs *Rows) awaitDone(ctx, txctx, closectx context.Context) {
2997 var txctxDone <-chan struct{}
2998 if txctx != nil {
2999 txctxDone = txctx.Done()
3000 }
3001 select {
3002 case <-ctx.Done():
3003 err := ctx.Err()
3004 rs.contextDone.Store(&err)
3005 case <-txctxDone:
3006 err := txctx.Err()
3007 rs.contextDone.Store(&err)
3008 case <-closectx.Done():
3009
3010
3011 }
3012 rs.close(ctx.Err())
3013 }
3014
3015
3016
3017
3018
3019
3020
3021 func (rs *Rows) Next() bool {
3022
3023
3024
3025 rs.closemuRUnlockIfHeldByScan()
3026
3027 if rs.contextDone.Load() != nil {
3028 return false
3029 }
3030
3031 var doClose, ok bool
3032 withLock(rs.closemu.RLocker(), func() {
3033 doClose, ok = rs.nextLocked()
3034 })
3035 if doClose {
3036 rs.Close()
3037 }
3038 if doClose && !ok {
3039 rs.hitEOF = true
3040 }
3041 return ok
3042 }
3043
3044 func (rs *Rows) nextLocked() (doClose, ok bool) {
3045 if rs.closed {
3046 return false, false
3047 }
3048
3049
3050
3051 rs.dc.Lock()
3052 defer rs.dc.Unlock()
3053
3054 if rs.lastcols == nil {
3055 rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
3056 }
3057
3058 rs.lasterr = rs.rowsi.Next(rs.lastcols)
3059 if rs.lasterr != nil {
3060
3061 if rs.lasterr != io.EOF {
3062 return true, false
3063 }
3064 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
3065 if !ok {
3066 return true, false
3067 }
3068
3069
3070
3071 if !nextResultSet.HasNextResultSet() {
3072 doClose = true
3073 }
3074 return doClose, false
3075 }
3076 return false, true
3077 }
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087 func (rs *Rows) NextResultSet() bool {
3088
3089
3090
3091 rs.closemuRUnlockIfHeldByScan()
3092
3093 var doClose bool
3094 defer func() {
3095 if doClose {
3096 rs.Close()
3097 }
3098 }()
3099 rs.closemu.RLock()
3100 defer rs.closemu.RUnlock()
3101
3102 if rs.closed {
3103 return false
3104 }
3105
3106 rs.lastcols = nil
3107 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
3108 if !ok {
3109 doClose = true
3110 return false
3111 }
3112
3113
3114
3115 rs.dc.Lock()
3116 defer rs.dc.Unlock()
3117
3118 rs.lasterr = nextResultSet.NextResultSet()
3119 if rs.lasterr != nil {
3120 doClose = true
3121 return false
3122 }
3123 return true
3124 }
3125
3126
3127
3128 func (rs *Rows) Err() error {
3129
3130
3131
3132
3133 if !rs.hitEOF {
3134 if errp := rs.contextDone.Load(); errp != nil {
3135 return *errp
3136 }
3137 }
3138
3139 rs.closemu.RLock()
3140 defer rs.closemu.RUnlock()
3141 return rs.lasterrOrErrLocked(nil)
3142 }
3143
3144
3145
3146
3147
3148
3149
3150 func (rs *Rows) rawbuf() []byte {
3151 if rs == nil {
3152
3153 return nil
3154 }
3155 return rs.raw
3156 }
3157
3158
3159
3160 func (rs *Rows) setrawbuf(b []byte) RawBytes {
3161 if rs == nil {
3162
3163 return RawBytes(b)
3164 }
3165 off := len(rs.raw)
3166 rs.raw = b
3167 return RawBytes(rs.raw[off:])
3168 }
3169
3170 var errRowsClosed = errors.New("sql: Rows are closed")
3171 var errNoRows = errors.New("sql: no Rows available")
3172
3173
3174
3175 func (rs *Rows) Columns() ([]string, error) {
3176 rs.closemu.RLock()
3177 defer rs.closemu.RUnlock()
3178 if rs.closed {
3179 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3180 }
3181 if rs.rowsi == nil {
3182 return nil, rs.lasterrOrErrLocked(errNoRows)
3183 }
3184 rs.dc.Lock()
3185 defer rs.dc.Unlock()
3186
3187 return rs.rowsi.Columns(), nil
3188 }
3189
3190
3191
3192 func (rs *Rows) ColumnTypes() ([]*ColumnType, error) {
3193 rs.closemu.RLock()
3194 defer rs.closemu.RUnlock()
3195 if rs.closed {
3196 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3197 }
3198 if rs.rowsi == nil {
3199 return nil, rs.lasterrOrErrLocked(errNoRows)
3200 }
3201 rs.dc.Lock()
3202 defer rs.dc.Unlock()
3203
3204 return rowsColumnInfoSetupConnLocked(rs.rowsi), nil
3205 }
3206
3207
3208 type ColumnType struct {
3209 name string
3210
3211 hasNullable bool
3212 hasLength bool
3213 hasPrecisionScale bool
3214
3215 nullable bool
3216 length int64
3217 databaseType string
3218 precision int64
3219 scale int64
3220 scanType reflect.Type
3221 }
3222
3223
3224 func (ci *ColumnType) Name() string {
3225 return ci.name
3226 }
3227
3228
3229
3230
3231
3232
3233 func (ci *ColumnType) Length() (length int64, ok bool) {
3234 return ci.length, ci.hasLength
3235 }
3236
3237
3238
3239 func (ci *ColumnType) DecimalSize() (precision, scale int64, ok bool) {
3240 return ci.precision, ci.scale, ci.hasPrecisionScale
3241 }
3242
3243
3244
3245
3246 func (ci *ColumnType) ScanType() reflect.Type {
3247 return ci.scanType
3248 }
3249
3250
3251
3252 func (ci *ColumnType) Nullable() (nullable, ok bool) {
3253 return ci.nullable, ci.hasNullable
3254 }
3255
3256
3257
3258
3259
3260
3261
3262 func (ci *ColumnType) DatabaseTypeName() string {
3263 return ci.databaseType
3264 }
3265
3266 func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType {
3267 names := rowsi.Columns()
3268
3269 list := make([]*ColumnType, len(names))
3270 for i := range list {
3271 ci := &ColumnType{
3272 name: names[i],
3273 }
3274 list[i] = ci
3275
3276 if prop, ok := rowsi.(driver.RowsColumnTypeScanType); ok {
3277 ci.scanType = prop.ColumnTypeScanType(i)
3278 } else {
3279 ci.scanType = reflect.TypeFor[any]()
3280 }
3281 if prop, ok := rowsi.(driver.RowsColumnTypeDatabaseTypeName); ok {
3282 ci.databaseType = prop.ColumnTypeDatabaseTypeName(i)
3283 }
3284 if prop, ok := rowsi.(driver.RowsColumnTypeLength); ok {
3285 ci.length, ci.hasLength = prop.ColumnTypeLength(i)
3286 }
3287 if prop, ok := rowsi.(driver.RowsColumnTypeNullable); ok {
3288 ci.nullable, ci.hasNullable = prop.ColumnTypeNullable(i)
3289 }
3290 if prop, ok := rowsi.(driver.RowsColumnTypePrecisionScale); ok {
3291 ci.precision, ci.scale, ci.hasPrecisionScale = prop.ColumnTypePrecisionScale(i)
3292 }
3293 }
3294 return list
3295 }
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357 func (rs *Rows) Scan(dest ...any) error {
3358 if rs.closemuScanHold {
3359
3360
3361 return fmt.Errorf("sql: Scan called without calling Next (closemuScanHold)")
3362 }
3363 rs.closemu.RLock()
3364
3365 if rs.lasterr != nil && rs.lasterr != io.EOF {
3366 rs.closemu.RUnlock()
3367 return rs.lasterr
3368 }
3369 if rs.closed {
3370 err := rs.lasterrOrErrLocked(errRowsClosed)
3371 rs.closemu.RUnlock()
3372 return err
3373 }
3374
3375 if scanArgsContainRawBytes(dest) {
3376 rs.closemuScanHold = true
3377 rs.raw = rs.raw[:0]
3378 } else {
3379 rs.closemu.RUnlock()
3380 }
3381
3382 if rs.lastcols == nil {
3383 rs.closemuRUnlockIfHeldByScan()
3384 return errors.New("sql: Scan called without calling Next")
3385 }
3386 if len(dest) != len(rs.lastcols) {
3387 rs.closemuRUnlockIfHeldByScan()
3388 return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))
3389 }
3390
3391 for i, sv := range rs.lastcols {
3392 err := convertAssignRows(dest[i], sv, rs)
3393 if err != nil {
3394 rs.closemuRUnlockIfHeldByScan()
3395 return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err)
3396 }
3397 }
3398 return nil
3399 }
3400
3401
3402
3403 func (rs *Rows) closemuRUnlockIfHeldByScan() {
3404 if rs.closemuScanHold {
3405 rs.closemuScanHold = false
3406 rs.closemu.RUnlock()
3407 }
3408 }
3409
3410 func scanArgsContainRawBytes(args []any) bool {
3411 for _, a := range args {
3412 if _, ok := a.(*RawBytes); ok {
3413 return true
3414 }
3415 }
3416 return false
3417 }
3418
3419
3420
3421 var rowsCloseHook = func() func(*Rows, *error) { return nil }
3422
3423
3424
3425
3426
3427 func (rs *Rows) Close() error {
3428
3429
3430
3431 rs.closemuRUnlockIfHeldByScan()
3432
3433 return rs.close(nil)
3434 }
3435
3436 func (rs *Rows) close(err error) error {
3437 rs.closemu.Lock()
3438 defer rs.closemu.Unlock()
3439
3440 if rs.closed {
3441 return nil
3442 }
3443 rs.closed = true
3444
3445 if rs.lasterr == nil {
3446 rs.lasterr = err
3447 }
3448
3449 withLock(rs.dc, func() {
3450 err = rs.rowsi.Close()
3451 })
3452 if fn := rowsCloseHook(); fn != nil {
3453 fn(rs, &err)
3454 }
3455 if rs.cancel != nil {
3456 rs.cancel()
3457 }
3458
3459 if rs.closeStmt != nil {
3460 rs.closeStmt.Close()
3461 }
3462 rs.releaseConn(err)
3463
3464 rs.lasterr = rs.lasterrOrErrLocked(err)
3465 return err
3466 }
3467
3468
3469 type Row struct {
3470
3471 err error
3472 rows *Rows
3473 }
3474
3475
3476
3477
3478
3479
3480 func (r *Row) Scan(dest ...any) error {
3481 if r.err != nil {
3482 return r.err
3483 }
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498 defer r.rows.Close()
3499 if scanArgsContainRawBytes(dest) {
3500 return errors.New("sql: RawBytes isn't allowed on Row.Scan")
3501 }
3502
3503 if !r.rows.Next() {
3504 if err := r.rows.Err(); err != nil {
3505 return err
3506 }
3507 return ErrNoRows
3508 }
3509 err := r.rows.Scan(dest...)
3510 if err != nil {
3511 return err
3512 }
3513
3514 return r.rows.Close()
3515 }
3516
3517
3518
3519
3520
3521 func (r *Row) Err() error {
3522 return r.err
3523 }
3524
3525
3526 type Result interface {
3527
3528
3529
3530
3531
3532 LastInsertId() (int64, error)
3533
3534
3535
3536
3537 RowsAffected() (int64, error)
3538 }
3539
3540 type driverResult struct {
3541 sync.Locker
3542 resi driver.Result
3543 }
3544
3545 func (dr driverResult) LastInsertId() (int64, error) {
3546 dr.Lock()
3547 defer dr.Unlock()
3548 return dr.resi.LastInsertId()
3549 }
3550
3551 func (dr driverResult) RowsAffected() (int64, error) {
3552 dr.Lock()
3553 defer dr.Unlock()
3554 return dr.resi.RowsAffected()
3555 }
3556
3557 func stack() string {
3558 var buf [2 << 10]byte
3559 return string(buf[:runtime.Stack(buf[:], false)])
3560 }
3561
3562
3563 func withLock(lk sync.Locker, fn func()) {
3564 lk.Lock()
3565 defer lk.Unlock()
3566 fn()
3567 }
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579 type connRequestSet struct {
3580
3581 s []connRequestAndIndex
3582 }
3583
3584 type connRequestAndIndex struct {
3585
3586 req chan connRequest
3587
3588
3589
3590 curIdx *int
3591 }
3592
3593
3594
3595 func (s *connRequestSet) CloseAndRemoveAll() {
3596 for _, v := range s.s {
3597 *v.curIdx = -1
3598 close(v.req)
3599 }
3600 s.s = nil
3601 }
3602
3603
3604 func (s *connRequestSet) Len() int { return len(s.s) }
3605
3606
3607
3608 type connRequestDelHandle struct {
3609 idx *int
3610 }
3611
3612
3613
3614
3615 func (s *connRequestSet) Add(v chan connRequest) connRequestDelHandle {
3616 idx := len(s.s)
3617
3618
3619
3620
3621
3622
3623
3624
3625 idxPtr := &idx
3626 s.s = append(s.s, connRequestAndIndex{v, idxPtr})
3627 return connRequestDelHandle{idxPtr}
3628 }
3629
3630
3631
3632
3633
3634 func (s *connRequestSet) Delete(h connRequestDelHandle) bool {
3635 idx := *h.idx
3636 if idx < 0 {
3637 return false
3638 }
3639 s.deleteIndex(idx)
3640 return true
3641 }
3642
3643 func (s *connRequestSet) deleteIndex(idx int) {
3644
3645 *(s.s[idx].curIdx) = -1
3646
3647
3648 if idx < len(s.s)-1 {
3649 last := s.s[len(s.s)-1]
3650 *last.curIdx = idx
3651 s.s[idx] = last
3652 }
3653
3654 s.s[len(s.s)-1] = connRequestAndIndex{}
3655 s.s = s.s[:len(s.s)-1]
3656 }
3657
3658
3659
3660
3661 func (s *connRequestSet) TakeRandom() (v chan connRequest, ok bool) {
3662 if len(s.s) == 0 {
3663 return nil, false
3664 }
3665 pick := rand.IntN(len(s.s))
3666 e := s.s[pick]
3667 s.deleteIndex(pick)
3668 return e.req, true
3669 }
3670
View as plain text