1
2
3
4
5 package fuzz
6
7 import (
8 "bytes"
9 "context"
10 "crypto/sha256"
11 "encoding/json"
12 "errors"
13 "fmt"
14 "io"
15 "os"
16 "os/exec"
17 "reflect"
18 "runtime"
19 "sync"
20 "time"
21 )
22
23 const (
24
25
26 workerFuzzDuration = 100 * time.Millisecond
27
28
29
30 workerTimeoutDuration = 1 * time.Second
31
32
33
34
35 workerExitCode = 70
36
37
38
39 workerSharedMemSize = 100 << 20
40 )
41
42
43
44
45
46 type worker struct {
47 dir string
48 binPath string
49 args []string
50 env []string
51
52 coordinator *coordinator
53
54 memMu chan *sharedMem
55
56 cmd *exec.Cmd
57 client *workerClient
58 waitErr error
59 interrupted bool
60 termC chan struct{}
61 }
62
63 func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) {
64 mem, err := sharedMemTempFile(workerSharedMemSize)
65 if err != nil {
66 return nil, err
67 }
68 memMu := make(chan *sharedMem, 1)
69 memMu <- mem
70 return &worker{
71 dir: dir,
72 binPath: binPath,
73 args: args,
74 env: env[:len(env):len(env)],
75 coordinator: c,
76 memMu: memMu,
77 }, nil
78 }
79
80
81 func (w *worker) cleanup() error {
82 mem := <-w.memMu
83 if mem == nil {
84 return nil
85 }
86 close(w.memMu)
87 return mem.Close()
88 }
89
90
91
92
93
94
95
96
97
98
99
100 func (w *worker) coordinate(ctx context.Context) error {
101
102 for {
103
104 if !w.isRunning() {
105 if err := w.startAndPing(ctx); err != nil {
106 return err
107 }
108 }
109
110 select {
111 case <-ctx.Done():
112
113 err := w.stop()
114 if err != nil && !w.interrupted && !isInterruptError(err) {
115 return err
116 }
117 return ctx.Err()
118
119 case <-w.termC:
120
121 err := w.stop()
122 if w.interrupted {
123 panic("worker interrupted after unexpected termination")
124 }
125 if err == nil || isInterruptError(err) {
126
127
128
129
130
131
132
133
134
135
136 return nil
137 }
138 if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
139
140
141 return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
142 }
143
144
145 return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err)
146
147
148 case input := <-w.coordinator.inputC:
149
150 args := fuzzArgs{
151 Limit: input.limit,
152 Timeout: input.timeout,
153 Warmup: input.warmup,
154 CoverageData: input.coverageData,
155 }
156 entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args)
157 canMinimize := true
158 if err != nil {
159
160 w.stop()
161 if ctx.Err() != nil {
162
163 return ctx.Err()
164 }
165 if w.interrupted {
166
167
168 return fmt.Errorf("communicating with fuzzing process: %v", err)
169 }
170 if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) {
171
172
173
174
175
176 return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr)
177 }
178 if isInternalError {
179
180
181 return err
182 }
183
184
185
186 resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr)
187 canMinimize = false
188 }
189 result := fuzzResult{
190 limit: input.limit,
191 count: resp.Count,
192 totalDuration: resp.TotalDuration,
193 entryDuration: resp.InterestingDuration,
194 entry: entry,
195 crasherMsg: resp.Err,
196 coverageData: resp.CoverageData,
197 canMinimize: canMinimize,
198 }
199 w.coordinator.resultC <- result
200
201 case input := <-w.coordinator.minimizeC:
202
203 result, err := w.minimize(ctx, input)
204 if err != nil {
205
206
207
208
209 result = fuzzResult{
210 entry: input.entry,
211 crasherMsg: input.crasherMsg,
212 canMinimize: false,
213 limit: input.limit,
214 }
215 if result.crasherMsg == "" {
216 result.crasherMsg = err.Error()
217 }
218 }
219 w.coordinator.resultC <- result
220 }
221 }
222 }
223
224
225
226
227
228 func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) {
229 if w.coordinator.opts.MinimizeTimeout != 0 {
230 var cancel func()
231 ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout)
232 defer cancel()
233 }
234
235 args := minimizeArgs{
236 Limit: input.limit,
237 Timeout: input.timeout,
238 KeepCoverage: input.keepCoverage,
239 }
240 entry, resp, err := w.client.minimize(ctx, input.entry, args)
241 if err != nil {
242
243 w.stop()
244 if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) {
245
246
247
248
249
250 return fuzzResult{
251 entry: input.entry,
252 crasherMsg: input.crasherMsg,
253 coverageData: input.keepCoverage,
254 canMinimize: false,
255 limit: input.limit,
256 }, nil
257 }
258 return fuzzResult{
259 entry: entry,
260 crasherMsg: fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err),
261 canMinimize: false,
262 limit: input.limit,
263 count: resp.Count,
264 totalDuration: resp.Duration,
265 }, nil
266 }
267
268 if input.crasherMsg != "" && resp.Err == "" {
269 return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce")
270 }
271
272 return fuzzResult{
273 entry: entry,
274 crasherMsg: resp.Err,
275 coverageData: resp.CoverageData,
276 canMinimize: false,
277 limit: input.limit,
278 count: resp.Count,
279 totalDuration: resp.Duration,
280 }, nil
281 }
282
283 func (w *worker) isRunning() bool {
284 return w.cmd != nil
285 }
286
287
288
289
290
291
292
293
294
295 func (w *worker) startAndPing(ctx context.Context) error {
296 if ctx.Err() != nil {
297 return ctx.Err()
298 }
299 if err := w.start(); err != nil {
300 return err
301 }
302 if err := w.client.ping(ctx); err != nil {
303 w.stop()
304 if ctx.Err() != nil {
305 return ctx.Err()
306 }
307 if isInterruptError(err) {
308
309 return err
310 }
311
312 return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
313 }
314 return nil
315 }
316
317
318
319
320
321
322
323
324
325
326
327 func (w *worker) start() (err error) {
328 if w.isRunning() {
329 panic("worker already started")
330 }
331 w.waitErr = nil
332 w.interrupted = false
333 w.termC = nil
334
335 cmd := exec.Command(w.binPath, w.args...)
336 cmd.Dir = w.dir
337 cmd.Env = w.env[:len(w.env):len(w.env)]
338
339
340
341
342
343
344
345
346
347 fuzzInR, fuzzInW, err := os.Pipe()
348 if err != nil {
349 return err
350 }
351 defer fuzzInR.Close()
352 fuzzOutR, fuzzOutW, err := os.Pipe()
353 if err != nil {
354 fuzzInW.Close()
355 return err
356 }
357 defer fuzzOutW.Close()
358 setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu})
359
360
361 if err := cmd.Start(); err != nil {
362 fuzzInW.Close()
363 fuzzOutR.Close()
364 return err
365 }
366
367
368
369
370 w.cmd = cmd
371 w.termC = make(chan struct{})
372 comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu}
373 m := newMutator()
374 w.client = newWorkerClient(comm, m)
375
376 go func() {
377 w.waitErr = w.cmd.Wait()
378 close(w.termC)
379 }()
380
381 return nil
382 }
383
384
385
386
387
388
389
390
391
392
393 func (w *worker) stop() error {
394 if w.termC == nil {
395 panic("worker was not started successfully")
396 }
397 select {
398 case <-w.termC:
399
400 if w.client == nil {
401
402 return w.waitErr
403 }
404
405 w.client.Close()
406 w.cmd = nil
407 w.client = nil
408 return w.waitErr
409 default:
410
411 }
412
413
414
415 closeC := make(chan struct{})
416 go func() {
417 w.client.Close()
418 close(closeC)
419 }()
420
421 sig := os.Interrupt
422 if runtime.GOOS == "windows" {
423
424
425
426 sig = os.Kill
427 }
428
429 t := time.NewTimer(workerTimeoutDuration)
430 for {
431 select {
432 case <-w.termC:
433
434 t.Stop()
435 <-closeC
436 w.cmd = nil
437 w.client = nil
438 return w.waitErr
439
440 case <-t.C:
441
442 w.interrupted = true
443 switch sig {
444 case os.Interrupt:
445
446 w.cmd.Process.Signal(sig)
447 sig = os.Kill
448 t.Reset(workerTimeoutDuration)
449
450 case os.Kill:
451
452 w.cmd.Process.Signal(sig)
453 sig = nil
454 t.Reset(workerTimeoutDuration)
455
456 case nil:
457
458 fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n")
459 }
460 }
461 }
462 }
463
464
465
466
467
468
469
470
471
472
473
474 func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error {
475 comm, err := getWorkerComm()
476 if err != nil {
477 return err
478 }
479 srv := &workerServer{
480 workerComm: comm,
481 fuzzFn: func(e CorpusEntry) (time.Duration, error) {
482 timer := time.AfterFunc(10*time.Second, func() {
483 panic("deadlocked!")
484 })
485 defer timer.Stop()
486 start := time.Now()
487 err := fn(e)
488 return time.Since(start), err
489 },
490 m: newMutator(),
491 }
492 return srv.serve(ctx)
493 }
494
495
496
497
498 type call struct {
499 Ping *pingArgs
500 Fuzz *fuzzArgs
501 Minimize *minimizeArgs
502 }
503
504
505
506 type minimizeArgs struct {
507
508
509
510 Timeout time.Duration
511
512
513
514 Limit int64
515
516
517
518
519 KeepCoverage []byte
520
521
522 Index int
523 }
524
525
526 type minimizeResponse struct {
527
528
529
530
531 WroteToMem bool
532
533
534 Err string
535
536
537
538
539 CoverageData []byte
540
541
542 Duration time.Duration
543
544
545 Count int64
546 }
547
548
549
550 type fuzzArgs struct {
551
552
553 Timeout time.Duration
554
555
556
557 Limit int64
558
559
560
561
562 Warmup bool
563
564
565
566 CoverageData []byte
567 }
568
569
570 type fuzzResponse struct {
571
572 TotalDuration time.Duration
573 InterestingDuration time.Duration
574
575
576 Count int64
577
578
579
580 CoverageData []byte
581
582
583
584 Err string
585
586
587
588 InternalErr string
589 }
590
591
592 type pingArgs struct{}
593
594
595 type pingResponse struct{}
596
597
598
599
600
601
602
603
604
605
606 type workerComm struct {
607 fuzzIn, fuzzOut *os.File
608 memMu chan *sharedMem
609 }
610
611
612
613
614
615
616 type workerServer struct {
617 workerComm
618 m *mutator
619
620
621
622
623 coverageMask []byte
624
625
626
627
628
629
630 fuzzFn func(CorpusEntry) (time.Duration, error)
631 }
632
633
634
635
636
637
638
639
640
641
642
643 func (ws *workerServer) serve(ctx context.Context) error {
644 enc := json.NewEncoder(ws.fuzzOut)
645 dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
646 for {
647 var c call
648 if err := dec.Decode(&c); err != nil {
649 if err == io.EOF || err == ctx.Err() {
650 return nil
651 } else {
652 return err
653 }
654 }
655
656 var resp any
657 switch {
658 case c.Fuzz != nil:
659 resp = ws.fuzz(ctx, *c.Fuzz)
660 case c.Minimize != nil:
661 resp = ws.minimize(ctx, *c.Minimize)
662 case c.Ping != nil:
663 resp = ws.ping(ctx, *c.Ping)
664 default:
665 return errors.New("no arguments provided for any call")
666 }
667
668 if err := enc.Encode(resp); err != nil {
669 return err
670 }
671 }
672 }
673
674
675
676
677
678
679
680
681
682
683 const chainedMutations = 5
684
685
686
687
688
689
690
691
692
693
694
695
696 func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) {
697 if args.CoverageData != nil {
698 if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) {
699 resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask))
700 return resp
701 }
702 ws.coverageMask = args.CoverageData
703 }
704 start := time.Now()
705 defer func() { resp.TotalDuration = time.Since(start) }()
706
707 if args.Timeout != 0 {
708 var cancel func()
709 ctx, cancel = context.WithTimeout(ctx, args.Timeout)
710 defer cancel()
711 }
712 mem := <-ws.memMu
713 ws.m.r.save(&mem.header().randState, &mem.header().randInc)
714 defer func() {
715 resp.Count = mem.header().count
716 ws.memMu <- mem
717 }()
718 if args.Limit > 0 && mem.header().count >= args.Limit {
719 resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit)
720 return resp
721 }
722
723 originalVals, err := unmarshalCorpusFile(mem.valueCopy())
724 if err != nil {
725 resp.InternalErr = err.Error()
726 return resp
727 }
728 vals := make([]any, len(originalVals))
729 copy(vals, originalVals)
730
731 shouldStop := func() bool {
732 return args.Limit > 0 && mem.header().count >= args.Limit
733 }
734 fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) {
735 mem.header().count++
736 var err error
737 dur, err = ws.fuzzFn(entry)
738 if err != nil {
739 errMsg = err.Error()
740 if errMsg == "" {
741 errMsg = "fuzz function failed with no input"
742 }
743 return dur, nil, errMsg
744 }
745 if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 {
746 return dur, coverageSnapshot, ""
747 }
748 return dur, nil, ""
749 }
750
751 if args.Warmup {
752 dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals})
753 if errMsg != "" {
754 resp.Err = errMsg
755 return resp
756 }
757 resp.InterestingDuration = dur
758 if coverageEnabled {
759 resp.CoverageData = coverageSnapshot
760 }
761 return resp
762 }
763
764 for {
765 select {
766 case <-ctx.Done():
767 return resp
768 default:
769 if mem.header().count%chainedMutations == 0 {
770 copy(vals, originalVals)
771 ws.m.r.save(&mem.header().randState, &mem.header().randInc)
772 }
773 ws.m.mutate(vals, cap(mem.valueRef()))
774
775 entry := CorpusEntry{Values: vals}
776 dur, cov, errMsg := fuzzOnce(entry)
777 if errMsg != "" {
778 resp.Err = errMsg
779 return resp
780 }
781 if cov != nil {
782 resp.CoverageData = cov
783 resp.InterestingDuration = dur
784 return resp
785 }
786 if shouldStop() {
787 return resp
788 }
789 }
790 }
791 }
792
793 func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) {
794 start := time.Now()
795 defer func() { resp.Duration = time.Since(start) }()
796 mem := <-ws.memMu
797 defer func() { ws.memMu <- mem }()
798 vals, err := unmarshalCorpusFile(mem.valueCopy())
799 if err != nil {
800 panic(err)
801 }
802 inpHash := sha256.Sum256(mem.valueCopy())
803 if args.Timeout != 0 {
804 var cancel func()
805 ctx, cancel = context.WithTimeout(ctx, args.Timeout)
806 defer cancel()
807 }
808
809
810
811 success, err := ws.minimizeInput(ctx, vals, mem, args)
812 if success {
813 writeToMem(vals, mem)
814 outHash := sha256.Sum256(mem.valueCopy())
815 mem.header().rawInMem = false
816 resp.WroteToMem = true
817 if err != nil {
818 resp.Err = err.Error()
819 } else {
820
821
822
823
824
825 if outHash != inpHash {
826 resp.CoverageData = coverageSnapshot
827 } else {
828 resp.CoverageData = args.KeepCoverage
829 }
830 }
831 }
832 return resp
833 }
834
835
836
837
838
839
840 func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) {
841 keepCoverage := args.KeepCoverage
842 memBytes := mem.valueRef()
843 bPtr := &memBytes
844 count := &mem.header().count
845 shouldStop := func() bool {
846 return ctx.Err() != nil ||
847 (args.Limit > 0 && *count >= args.Limit)
848 }
849 if shouldStop() {
850 return false, nil
851 }
852
853
854
855
856 *count++
857 _, retErr = ws.fuzzFn(CorpusEntry{Values: vals})
858 if keepCoverage != nil {
859 if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil {
860 return false, nil
861 }
862 } else if retErr == nil {
863 return false, nil
864 }
865 mem.header().rawInMem = true
866
867
868
869
870
871 tryMinimized := func(candidate []byte) bool {
872 prev := vals[args.Index]
873 switch prev.(type) {
874 case []byte:
875 vals[args.Index] = candidate
876 case string:
877 vals[args.Index] = string(candidate)
878 default:
879 panic("impossible")
880 }
881 copy(*bPtr, candidate)
882 *bPtr = (*bPtr)[:len(candidate)]
883 mem.setValueLen(len(candidate))
884 *count++
885 _, err := ws.fuzzFn(CorpusEntry{Values: vals})
886 if err != nil {
887 retErr = err
888 if keepCoverage != nil {
889
890
891
892 keepCoverage = nil
893 }
894 return true
895 }
896
897 if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) {
898 return true
899 }
900 vals[args.Index] = prev
901 return false
902 }
903 switch v := vals[args.Index].(type) {
904 case string:
905 minimizeBytes([]byte(v), tryMinimized, shouldStop)
906 case []byte:
907 minimizeBytes(v, tryMinimized, shouldStop)
908 default:
909 panic("impossible")
910 }
911 return true, retErr
912 }
913
914 func writeToMem(vals []any, mem *sharedMem) {
915 b := marshalCorpusFile(vals...)
916 mem.setValue(b)
917 }
918
919
920
921 func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse {
922 return pingResponse{}
923 }
924
925
926
927
928 type workerClient struct {
929 workerComm
930 m *mutator
931
932
933
934
935
936
937 mu sync.Mutex
938 }
939
940 func newWorkerClient(comm workerComm, m *mutator) *workerClient {
941 return &workerClient{workerComm: comm, m: m}
942 }
943
944
945
946
947 func (wc *workerClient) Close() error {
948 wc.mu.Lock()
949 defer wc.mu.Unlock()
950
951
952
953 if err := wc.fuzzIn.Close(); err != nil {
954 wc.fuzzOut.Close()
955 return err
956 }
957
958
959
960 if _, err := io.Copy(io.Discard, wc.fuzzOut); err != nil {
961 wc.fuzzOut.Close()
962 return err
963 }
964 return wc.fuzzOut.Close()
965 }
966
967
968
969
970
971
972
973
974 var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
975
976
977
978 func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) {
979 wc.mu.Lock()
980 defer wc.mu.Unlock()
981
982 mem, ok := <-wc.memMu
983 if !ok {
984 return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
985 }
986 mem.header().count = 0
987 inp, err := corpusEntryData(entryIn)
988 if err != nil {
989 return CorpusEntry{}, minimizeResponse{}, err
990 }
991 mem.setValue(inp)
992 defer func() { wc.memMu <- mem }()
993 entryOut = entryIn
994 entryOut.Values, err = unmarshalCorpusFile(inp)
995 if err != nil {
996 return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err)
997 }
998 for i, v := range entryOut.Values {
999 if !isMinimizable(reflect.TypeOf(v)) {
1000 continue
1001 }
1002
1003 wc.memMu <- mem
1004 args.Index = i
1005 c := call{Minimize: &args}
1006 callErr := wc.callLocked(ctx, c, &resp)
1007 mem, ok = <-wc.memMu
1008 if !ok {
1009 return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
1010 }
1011
1012 if callErr != nil {
1013 retErr = callErr
1014 if !mem.header().rawInMem {
1015
1016 return entryIn, minimizeResponse{}, retErr
1017 }
1018
1019
1020
1021 switch entryOut.Values[i].(type) {
1022 case string:
1023 entryOut.Values[i] = string(mem.valueCopy())
1024 case []byte:
1025 entryOut.Values[i] = mem.valueCopy()
1026 default:
1027 panic("impossible")
1028 }
1029 entryOut.Data = marshalCorpusFile(entryOut.Values...)
1030
1031 break
1032 }
1033
1034 if resp.WroteToMem {
1035
1036 entryOut.Data = mem.valueCopy()
1037 entryOut.Values, err = unmarshalCorpusFile(entryOut.Data)
1038 if err != nil {
1039 return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err)
1040 }
1041 }
1042
1043
1044 if args.Timeout != 0 {
1045 args.Timeout -= resp.Duration
1046 if args.Timeout <= 0 {
1047 break
1048 }
1049 }
1050 if args.Limit != 0 {
1051 args.Limit -= mem.header().count
1052 if args.Limit <= 0 {
1053 break
1054 }
1055 }
1056 }
1057 resp.Count = mem.header().count
1058 h := sha256.Sum256(entryOut.Data)
1059 entryOut.Path = fmt.Sprintf("%x", h[:4])
1060 return entryOut, resp, retErr
1061 }
1062
1063
1064 func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) {
1065 wc.mu.Lock()
1066 defer wc.mu.Unlock()
1067
1068 mem, ok := <-wc.memMu
1069 if !ok {
1070 return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1071 }
1072 mem.header().count = 0
1073 inp, err := corpusEntryData(entryIn)
1074 if err != nil {
1075 return CorpusEntry{}, fuzzResponse{}, true, err
1076 }
1077 mem.setValue(inp)
1078 wc.memMu <- mem
1079
1080 c := call{Fuzz: &args}
1081 callErr := wc.callLocked(ctx, c, &resp)
1082 if resp.InternalErr != "" {
1083 return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr)
1084 }
1085 mem, ok = <-wc.memMu
1086 if !ok {
1087 return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1088 }
1089 defer func() { wc.memMu <- mem }()
1090 resp.Count = mem.header().count
1091
1092 if !bytes.Equal(inp, mem.valueRef()) {
1093 return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input")
1094 }
1095 needEntryOut := callErr != nil || resp.Err != "" ||
1096 (!args.Warmup && resp.CoverageData != nil)
1097 if needEntryOut {
1098 valuesOut, err := unmarshalCorpusFile(inp)
1099 if err != nil {
1100 return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err)
1101 }
1102 wc.m.r.restore(mem.header().randState, mem.header().randInc)
1103 if !args.Warmup {
1104
1105 numMutations := ((resp.Count - 1) % chainedMutations) + 1
1106 for i := int64(0); i < numMutations; i++ {
1107 wc.m.mutate(valuesOut, cap(mem.valueRef()))
1108 }
1109 }
1110 dataOut := marshalCorpusFile(valuesOut...)
1111
1112 h := sha256.Sum256(dataOut)
1113 name := fmt.Sprintf("%x", h[:4])
1114 entryOut = CorpusEntry{
1115 Parent: entryIn.Path,
1116 Path: name,
1117 Data: dataOut,
1118 Generation: entryIn.Generation + 1,
1119 }
1120 if args.Warmup {
1121
1122
1123 entryOut.IsSeed = entryIn.IsSeed
1124 }
1125 }
1126
1127 return entryOut, resp, false, callErr
1128 }
1129
1130
1131 func (wc *workerClient) ping(ctx context.Context) error {
1132 wc.mu.Lock()
1133 defer wc.mu.Unlock()
1134 c := call{Ping: &pingArgs{}}
1135 var resp pingResponse
1136 return wc.callLocked(ctx, c, &resp)
1137 }
1138
1139
1140
1141 func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) {
1142 enc := json.NewEncoder(wc.fuzzIn)
1143 dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
1144 if err := enc.Encode(c); err != nil {
1145 return err
1146 }
1147 return dec.Decode(resp)
1148 }
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158 type contextReader struct {
1159 ctx context.Context
1160 r io.Reader
1161 }
1162
1163 func (cr *contextReader) Read(b []byte) (int, error) {
1164 if ctxErr := cr.ctx.Err(); ctxErr != nil {
1165 return 0, ctxErr
1166 }
1167 done := make(chan struct{})
1168
1169
1170
1171 var n int
1172 var err error
1173 go func() {
1174 n, err = cr.r.Read(b)
1175 close(done)
1176 }()
1177
1178 select {
1179 case <-cr.ctx.Done():
1180 return 0, cr.ctx.Err()
1181 case <-done:
1182 return n, err
1183 }
1184 }
1185
View as plain text