Source file src/internal/fuzz/worker.go

     1  // Copyright 2020 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package fuzz
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"crypto/sha256"
    11  	"encoding/json"
    12  	"errors"
    13  	"fmt"
    14  	"io"
    15  	"os"
    16  	"os/exec"
    17  	"reflect"
    18  	"runtime"
    19  	"sync"
    20  	"time"
    21  )
    22  
    23  const (
    24  	// workerFuzzDuration is the amount of time a worker can spend testing random
    25  	// variations of an input given by the coordinator.
    26  	workerFuzzDuration = 100 * time.Millisecond
    27  
    28  	// workerTimeoutDuration is the amount of time a worker can go without
    29  	// responding to the coordinator before being stopped.
    30  	workerTimeoutDuration = 1 * time.Second
    31  
    32  	// workerExitCode is used as an exit code by fuzz worker processes after an internal error.
    33  	// This distinguishes internal errors from uncontrolled panics and other crashes.
    34  	// Keep in sync with internal/fuzz.workerExitCode.
    35  	workerExitCode = 70
    36  
    37  	// workerSharedMemSize is the maximum size of the shared memory file used to
    38  	// communicate with workers. This limits the size of fuzz inputs.
    39  	workerSharedMemSize = 100 << 20 // 100 MB
    40  )
    41  
    42  // worker manages a worker process running a test binary. The worker object
    43  // exists only in the coordinator (the process started by 'go test -fuzz').
    44  // workerClient is used by the coordinator to send RPCs to the worker process,
    45  // which handles them with workerServer.
    46  type worker struct {
    47  	dir     string   // working directory, same as package directory
    48  	binPath string   // path to test executable
    49  	args    []string // arguments for test executable
    50  	env     []string // environment for test executable
    51  
    52  	coordinator *coordinator
    53  
    54  	memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
    55  
    56  	cmd         *exec.Cmd     // current worker process
    57  	client      *workerClient // used to communicate with worker process
    58  	waitErr     error         // last error returned by wait, set before termC is closed.
    59  	interrupted bool          // true after stop interrupts a running worker.
    60  	termC       chan struct{} // closed by wait when worker process terminates
    61  }
    62  
    63  func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) {
    64  	mem, err := sharedMemTempFile(workerSharedMemSize)
    65  	if err != nil {
    66  		return nil, err
    67  	}
    68  	memMu := make(chan *sharedMem, 1)
    69  	memMu <- mem
    70  	return &worker{
    71  		dir:         dir,
    72  		binPath:     binPath,
    73  		args:        args,
    74  		env:         env[:len(env):len(env)], // copy on append to ensure workers don't overwrite each other.
    75  		coordinator: c,
    76  		memMu:       memMu,
    77  	}, nil
    78  }
    79  
    80  // cleanup releases persistent resources associated with the worker.
    81  func (w *worker) cleanup() error {
    82  	mem := <-w.memMu
    83  	if mem == nil {
    84  		return nil
    85  	}
    86  	close(w.memMu)
    87  	return mem.Close()
    88  }
    89  
    90  // coordinate runs the test binary to perform fuzzing.
    91  //
    92  // coordinate loops until ctx is cancelled or a fatal error is encountered.
    93  // If a test process terminates unexpectedly while fuzzing, coordinate will
    94  // attempt to restart and continue unless the termination can be attributed
    95  // to an interruption (from a timer or the user).
    96  //
    97  // While looping, coordinate receives inputs from the coordinator, passes
    98  // those inputs to the worker process, then passes the results back to
    99  // the coordinator.
   100  func (w *worker) coordinate(ctx context.Context) error {
   101  	// Main event loop.
   102  	for {
   103  		// Start or restart the worker if it's not running.
   104  		if !w.isRunning() {
   105  			if err := w.startAndPing(ctx); err != nil {
   106  				return err
   107  			}
   108  		}
   109  
   110  		select {
   111  		case <-ctx.Done():
   112  			// Worker was told to stop.
   113  			err := w.stop()
   114  			if err != nil && !w.interrupted && !isInterruptError(err) {
   115  				return err
   116  			}
   117  			return ctx.Err()
   118  
   119  		case <-w.termC:
   120  			// Worker process terminated unexpectedly while waiting for input.
   121  			err := w.stop()
   122  			if w.interrupted {
   123  				panic("worker interrupted after unexpected termination")
   124  			}
   125  			if err == nil || isInterruptError(err) {
   126  				// Worker stopped, either by exiting with status 0 or after being
   127  				// interrupted with a signal that was not sent by the coordinator.
   128  				//
   129  				// When the user presses ^C, on POSIX platforms, SIGINT is delivered to
   130  				// all processes in the group concurrently, and the worker may see it
   131  				// before the coordinator. The worker should exit 0 gracefully (in
   132  				// theory).
   133  				//
   134  				// This condition is probably intended by the user, so suppress
   135  				// the error.
   136  				return nil
   137  			}
   138  			if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
   139  				// Worker exited with a code indicating F.Fuzz was not called correctly,
   140  				// for example, F.Fail was called first.
   141  				return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
   142  			}
   143  			// Worker exited non-zero or was terminated by a non-interrupt
   144  			// signal (for example, SIGSEGV) while fuzzing.
   145  			return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err)
   146  			// TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
   147  
   148  		case input := <-w.coordinator.inputC:
   149  			// Received input from coordinator.
   150  			args := fuzzArgs{
   151  				Limit:        input.limit,
   152  				Timeout:      input.timeout,
   153  				Warmup:       input.warmup,
   154  				CoverageData: input.coverageData,
   155  			}
   156  			entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args)
   157  			canMinimize := true
   158  			if err != nil {
   159  				// Error communicating with worker.
   160  				w.stop()
   161  				if ctx.Err() != nil {
   162  					// Timeout or interruption.
   163  					return ctx.Err()
   164  				}
   165  				if w.interrupted {
   166  					// Communication error before we stopped the worker.
   167  					// Report an error, but don't record a crasher.
   168  					return fmt.Errorf("communicating with fuzzing process: %v", err)
   169  				}
   170  				if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) {
   171  					// Worker terminated by a signal that probably wasn't caused by a
   172  					// specific input to the fuzz function. For example, on Linux,
   173  					// the kernel (OOM killer) may send SIGKILL to a process using a lot
   174  					// of memory. Or the shell might send SIGHUP when the terminal
   175  					// is closed. Don't record a crasher.
   176  					return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr)
   177  				}
   178  				if isInternalError {
   179  					// An internal error occurred which shouldn't be considered
   180  					// a crash.
   181  					return err
   182  				}
   183  				// Unexpected termination. Set error message and fall through.
   184  				// We'll restart the worker on the next iteration.
   185  				// Don't attempt to minimize this since it crashed the worker.
   186  				resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr)
   187  				canMinimize = false
   188  			}
   189  			result := fuzzResult{
   190  				limit:         input.limit,
   191  				count:         resp.Count,
   192  				totalDuration: resp.TotalDuration,
   193  				entryDuration: resp.InterestingDuration,
   194  				entry:         entry,
   195  				crasherMsg:    resp.Err,
   196  				coverageData:  resp.CoverageData,
   197  				canMinimize:   canMinimize,
   198  			}
   199  			w.coordinator.resultC <- result
   200  
   201  		case input := <-w.coordinator.minimizeC:
   202  			// Received input to minimize from coordinator.
   203  			result, err := w.minimize(ctx, input)
   204  			if err != nil {
   205  				// Error minimizing. Send back the original input. If it didn't cause
   206  				// an error before, report it as causing an error now.
   207  				// TODO: double-check this is handled correctly when
   208  				// implementing -keepfuzzing.
   209  				result = fuzzResult{
   210  					entry:       input.entry,
   211  					crasherMsg:  input.crasherMsg,
   212  					canMinimize: false,
   213  					limit:       input.limit,
   214  				}
   215  				if result.crasherMsg == "" {
   216  					result.crasherMsg = err.Error()
   217  				}
   218  			}
   219  			if shouldPrintDebugInfo() {
   220  				w.coordinator.debugLogf(
   221  					"input minimized, id: %s, original id: %s, crasher: %t, originally crasher: %t, minimizing took: %s",
   222  					result.entry.Path,
   223  					input.entry.Path,
   224  					result.crasherMsg != "",
   225  					input.crasherMsg != "",
   226  					result.totalDuration,
   227  				)
   228  			}
   229  			w.coordinator.resultC <- result
   230  		}
   231  	}
   232  }
   233  
   234  // minimize tells a worker process to attempt to find a smaller value that
   235  // either causes an error (if we started minimizing because we found an input
   236  // that causes an error) or preserves new coverage (if we started minimizing
   237  // because we found an input that expands coverage).
   238  func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) {
   239  	if w.coordinator.opts.MinimizeTimeout != 0 {
   240  		var cancel func()
   241  		ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout)
   242  		defer cancel()
   243  	}
   244  
   245  	args := minimizeArgs{
   246  		Limit:        input.limit,
   247  		Timeout:      input.timeout,
   248  		KeepCoverage: input.keepCoverage,
   249  	}
   250  	entry, resp, err := w.client.minimize(ctx, input.entry, args)
   251  	if err != nil {
   252  		// Error communicating with worker.
   253  		w.stop()
   254  		if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) {
   255  			// Worker was interrupted, possibly by the user pressing ^C.
   256  			// Normally, workers can handle interrupts and timeouts gracefully and
   257  			// will return without error. An error here indicates the worker
   258  			// may not have been in a good state, but the error won't be meaningful
   259  			// to the user. Just return the original crasher without logging anything.
   260  			return fuzzResult{
   261  				entry:        input.entry,
   262  				crasherMsg:   input.crasherMsg,
   263  				coverageData: input.keepCoverage,
   264  				canMinimize:  false,
   265  				limit:        input.limit,
   266  			}, nil
   267  		}
   268  		return fuzzResult{
   269  			entry:         entry,
   270  			crasherMsg:    fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err),
   271  			canMinimize:   false,
   272  			limit:         input.limit,
   273  			count:         resp.Count,
   274  			totalDuration: resp.Duration,
   275  		}, nil
   276  	}
   277  
   278  	if input.crasherMsg != "" && resp.Err == "" {
   279  		return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce")
   280  	}
   281  
   282  	return fuzzResult{
   283  		entry:         entry,
   284  		crasherMsg:    resp.Err,
   285  		coverageData:  resp.CoverageData,
   286  		canMinimize:   false,
   287  		limit:         input.limit,
   288  		count:         resp.Count,
   289  		totalDuration: resp.Duration,
   290  	}, nil
   291  }
   292  
   293  func (w *worker) isRunning() bool {
   294  	return w.cmd != nil
   295  }
   296  
   297  // startAndPing starts the worker process and sends it a message to make sure it
   298  // can communicate.
   299  //
   300  // startAndPing returns an error if any part of this didn't work, including if
   301  // the context is expired or the worker process was interrupted before it
   302  // responded. Errors that happen after start but before the ping response
   303  // likely indicate that the worker did not call F.Fuzz or called F.Fail first.
   304  // We don't record crashers for these errors.
   305  func (w *worker) startAndPing(ctx context.Context) error {
   306  	if ctx.Err() != nil {
   307  		return ctx.Err()
   308  	}
   309  	if err := w.start(); err != nil {
   310  		return err
   311  	}
   312  	if err := w.client.ping(ctx); err != nil {
   313  		w.stop()
   314  		if ctx.Err() != nil {
   315  			return ctx.Err()
   316  		}
   317  		if isInterruptError(err) {
   318  			// User may have pressed ^C before worker responded.
   319  			return err
   320  		}
   321  		// TODO: record and return stderr.
   322  		return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
   323  	}
   324  	return nil
   325  }
   326  
   327  // start runs a new worker process.
   328  //
   329  // If the process couldn't be started, start returns an error. Start won't
   330  // return later termination errors from the process if they occur.
   331  //
   332  // If the process starts successfully, start returns nil. stop must be called
   333  // once later to clean up, even if the process terminates on its own.
   334  //
   335  // When the process terminates, w.waitErr is set to the error (if any), and
   336  // w.termC is closed.
   337  func (w *worker) start() (err error) {
   338  	if w.isRunning() {
   339  		panic("worker already started")
   340  	}
   341  	w.waitErr = nil
   342  	w.interrupted = false
   343  	w.termC = nil
   344  
   345  	cmd := exec.Command(w.binPath, w.args...)
   346  	cmd.Dir = w.dir
   347  	cmd.Env = w.env[:len(w.env):len(w.env)] // copy on append to ensure workers don't overwrite each other.
   348  
   349  	// Create the "fuzz_in" and "fuzz_out" pipes so we can communicate with
   350  	// the worker. We don't use stdin and stdout, since the test binary may
   351  	// do something else with those.
   352  	//
   353  	// Each pipe has a reader and a writer. The coordinator writes to fuzzInW
   354  	// and reads from fuzzOutR. The worker inherits fuzzInR and fuzzOutW.
   355  	// The coordinator closes fuzzInR and fuzzOutW after starting the worker,
   356  	// since we have no further need of them.
   357  	fuzzInR, fuzzInW, err := os.Pipe()
   358  	if err != nil {
   359  		return err
   360  	}
   361  	defer fuzzInR.Close()
   362  	fuzzOutR, fuzzOutW, err := os.Pipe()
   363  	if err != nil {
   364  		fuzzInW.Close()
   365  		return err
   366  	}
   367  	defer fuzzOutW.Close()
   368  	setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu})
   369  
   370  	// Start the worker process.
   371  	if err := cmd.Start(); err != nil {
   372  		fuzzInW.Close()
   373  		fuzzOutR.Close()
   374  		return err
   375  	}
   376  
   377  	// Worker started successfully.
   378  	// After this, w.client owns fuzzInW and fuzzOutR, so w.client.Close must be
   379  	// called later by stop.
   380  	w.cmd = cmd
   381  	w.termC = make(chan struct{})
   382  	comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu}
   383  	m := newMutator()
   384  	w.client = newWorkerClient(comm, m)
   385  
   386  	go func() {
   387  		w.waitErr = w.cmd.Wait()
   388  		close(w.termC)
   389  	}()
   390  
   391  	return nil
   392  }
   393  
   394  // stop tells the worker process to exit by closing w.client, then blocks until
   395  // it terminates. If the worker doesn't terminate after a short time, stop
   396  // signals it with os.Interrupt (where supported), then os.Kill.
   397  //
   398  // stop returns the error the process terminated with, if any (same as
   399  // w.waitErr).
   400  //
   401  // stop must be called at least once after start returns successfully, even if
   402  // the worker process terminates unexpectedly.
   403  func (w *worker) stop() error {
   404  	if w.termC == nil {
   405  		panic("worker was not started successfully")
   406  	}
   407  	select {
   408  	case <-w.termC:
   409  		// Worker already terminated.
   410  		if w.client == nil {
   411  			// stop already called.
   412  			return w.waitErr
   413  		}
   414  		// Possible unexpected termination.
   415  		w.client.Close()
   416  		w.cmd = nil
   417  		w.client = nil
   418  		return w.waitErr
   419  	default:
   420  		// Worker still running.
   421  	}
   422  
   423  	// Tell the worker to stop by closing fuzz_in. It won't actually stop until it
   424  	// finishes with earlier calls.
   425  	closeC := make(chan struct{})
   426  	go func() {
   427  		w.client.Close()
   428  		close(closeC)
   429  	}()
   430  
   431  	sig := os.Interrupt
   432  	if runtime.GOOS == "windows" {
   433  		// Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on
   434  		// Windows; using it with os.Process.Signal will return an error.”
   435  		// Fall back to Kill instead.
   436  		sig = os.Kill
   437  	}
   438  
   439  	t := time.NewTimer(workerTimeoutDuration)
   440  	for {
   441  		select {
   442  		case <-w.termC:
   443  			// Worker terminated.
   444  			t.Stop()
   445  			<-closeC
   446  			w.cmd = nil
   447  			w.client = nil
   448  			return w.waitErr
   449  
   450  		case <-t.C:
   451  			// Timer fired before worker terminated.
   452  			w.interrupted = true
   453  			switch sig {
   454  			case os.Interrupt:
   455  				// Try to stop the worker with SIGINT and wait a little longer.
   456  				w.cmd.Process.Signal(sig)
   457  				sig = os.Kill
   458  				t.Reset(workerTimeoutDuration)
   459  
   460  			case os.Kill:
   461  				// Try to stop the worker with SIGKILL and keep waiting.
   462  				w.cmd.Process.Signal(sig)
   463  				sig = nil
   464  				t.Reset(workerTimeoutDuration)
   465  
   466  			case nil:
   467  				// Still waiting. Print a message to let the user know why.
   468  				fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n")
   469  			}
   470  		}
   471  	}
   472  }
   473  
   474  // RunFuzzWorker is called in a worker process to communicate with the
   475  // coordinator process in order to fuzz random inputs. RunFuzzWorker loops
   476  // until the coordinator tells it to stop.
   477  //
   478  // fn is a wrapper on the fuzz function. It may return an error to indicate
   479  // a given input "crashed". The coordinator will also record a crasher if
   480  // the function times out or terminates the process.
   481  //
   482  // RunFuzzWorker returns an error if it could not communicate with the
   483  // coordinator process.
   484  func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error {
   485  	comm, err := getWorkerComm()
   486  	if err != nil {
   487  		return err
   488  	}
   489  	srv := &workerServer{
   490  		workerComm: comm,
   491  		fuzzFn: func(e CorpusEntry) (time.Duration, error) {
   492  			timer := time.AfterFunc(10*time.Second, func() {
   493  				panic("deadlocked!") // this error message won't be printed
   494  			})
   495  			defer timer.Stop()
   496  			start := time.Now()
   497  			err := fn(e)
   498  			return time.Since(start), err
   499  		},
   500  		m: newMutator(),
   501  	}
   502  	return srv.serve(ctx)
   503  }
   504  
   505  // call is serialized and sent from the coordinator on fuzz_in. It acts as
   506  // a minimalist RPC mechanism. Exactly one of its fields must be set to indicate
   507  // which method to call.
   508  type call struct {
   509  	Ping     *pingArgs
   510  	Fuzz     *fuzzArgs
   511  	Minimize *minimizeArgs
   512  }
   513  
   514  // minimizeArgs contains arguments to workerServer.minimize. The value to
   515  // minimize is already in shared memory.
   516  type minimizeArgs struct {
   517  	// Timeout is the time to spend minimizing. This may include time to start up,
   518  	// especially if the input causes the worker process to terminated, requiring
   519  	// repeated restarts.
   520  	Timeout time.Duration
   521  
   522  	// Limit is the maximum number of values to test, without spending more time
   523  	// than Duration. 0 indicates no limit.
   524  	Limit int64
   525  
   526  	// KeepCoverage is a set of coverage counters the worker should attempt to
   527  	// keep in minimized values. When provided, the worker will reject inputs that
   528  	// don't cause at least one of these bits to be set.
   529  	KeepCoverage []byte
   530  
   531  	// Index is the index of the fuzz target parameter to be minimized.
   532  	Index int
   533  }
   534  
   535  // minimizeResponse contains results from workerServer.minimize.
   536  type minimizeResponse struct {
   537  	// WroteToMem is true if the worker found a smaller input and wrote it to
   538  	// shared memory. If minimizeArgs.KeepCoverage was set, the minimized input
   539  	// preserved at least one coverage bit and did not cause an error.
   540  	// Otherwise, the minimized input caused some error, recorded in Err.
   541  	WroteToMem bool
   542  
   543  	// Err is the error string caused by the value in shared memory, if any.
   544  	Err string
   545  
   546  	// CoverageData is the set of coverage bits activated by the minimized value
   547  	// in shared memory. When set, it contains at least one bit from KeepCoverage.
   548  	// CoverageData will be nil if Err is set or if minimization failed.
   549  	CoverageData []byte
   550  
   551  	// Duration is the time spent minimizing, not including starting or cleaning up.
   552  	Duration time.Duration
   553  
   554  	// Count is the number of values tested.
   555  	Count int64
   556  }
   557  
   558  // fuzzArgs contains arguments to workerServer.fuzz. The value to fuzz is
   559  // passed in shared memory.
   560  type fuzzArgs struct {
   561  	// Timeout is the time to spend fuzzing, not including starting or
   562  	// cleaning up.
   563  	Timeout time.Duration
   564  
   565  	// Limit is the maximum number of values to test, without spending more time
   566  	// than Duration. 0 indicates no limit.
   567  	Limit int64
   568  
   569  	// Warmup indicates whether this is part of a warmup run, meaning that
   570  	// fuzzing should not occur. If coverageEnabled is true, then coverage data
   571  	// should be reported.
   572  	Warmup bool
   573  
   574  	// CoverageData is the coverage data. If set, the worker should update its
   575  	// local coverage data prior to fuzzing.
   576  	CoverageData []byte
   577  }
   578  
   579  // fuzzResponse contains results from workerServer.fuzz.
   580  type fuzzResponse struct {
   581  	// Duration is the time spent fuzzing, not including starting or cleaning up.
   582  	TotalDuration       time.Duration
   583  	InterestingDuration time.Duration
   584  
   585  	// Count is the number of values tested.
   586  	Count int64
   587  
   588  	// CoverageData is set if the value in shared memory expands coverage
   589  	// and therefore may be interesting to the coordinator.
   590  	CoverageData []byte
   591  
   592  	// Err is the error string caused by the value in shared memory, which is
   593  	// non-empty if the value in shared memory caused a crash.
   594  	Err string
   595  
   596  	// InternalErr is the error string caused by an internal error in the
   597  	// worker. This shouldn't be considered a crasher.
   598  	InternalErr string
   599  }
   600  
   601  // pingArgs contains arguments to workerServer.ping.
   602  type pingArgs struct{}
   603  
   604  // pingResponse contains results from workerServer.ping.
   605  type pingResponse struct{}
   606  
   607  // workerComm holds pipes and shared memory used for communication
   608  // between the coordinator process (client) and a worker process (server).
   609  // These values are unique to each worker; they are shared only with the
   610  // coordinator, not with other workers.
   611  //
   612  // Access to shared memory is synchronized implicitly over the RPC protocol
   613  // implemented in workerServer and workerClient. During a call, the client
   614  // (worker) has exclusive access to shared memory; at other times, the server
   615  // (coordinator) has exclusive access.
   616  type workerComm struct {
   617  	fuzzIn, fuzzOut *os.File
   618  	memMu           chan *sharedMem // mutex guarding shared memory
   619  }
   620  
   621  // workerServer is a minimalist RPC server, run by fuzz worker processes.
   622  // It allows the coordinator process (using workerClient) to call methods in a
   623  // worker process. This system allows the coordinator to run multiple worker
   624  // processes in parallel and to collect inputs that caused crashes from shared
   625  // memory after a worker process terminates unexpectedly.
   626  type workerServer struct {
   627  	workerComm
   628  	m *mutator
   629  
   630  	// coverageMask is the local coverage data for the worker. It is
   631  	// periodically updated to reflect the data in the coordinator when new
   632  	// coverage is found.
   633  	coverageMask []byte
   634  
   635  	// fuzzFn runs the worker's fuzz target on the given input and returns an
   636  	// error if it finds a crasher (the process may also exit or crash), and the
   637  	// time it took to run the input. It sets a deadline of 10 seconds, at which
   638  	// point it will panic with the assumption that the process is hanging or
   639  	// deadlocked.
   640  	fuzzFn func(CorpusEntry) (time.Duration, error)
   641  }
   642  
   643  // serve reads serialized RPC messages on fuzzIn. When serve receives a message,
   644  // it calls the corresponding method, then sends the serialized result back
   645  // on fuzzOut.
   646  //
   647  // serve handles RPC calls synchronously; it will not attempt to read a message
   648  // until the previous call has finished.
   649  //
   650  // serve returns errors that occurred when communicating over pipes. serve
   651  // does not return errors from method calls; those are passed through serialized
   652  // responses.
   653  func (ws *workerServer) serve(ctx context.Context) error {
   654  	enc := json.NewEncoder(ws.fuzzOut)
   655  	dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
   656  	for {
   657  		var c call
   658  		if err := dec.Decode(&c); err != nil {
   659  			if err == io.EOF || err == ctx.Err() {
   660  				return nil
   661  			} else {
   662  				return err
   663  			}
   664  		}
   665  
   666  		var resp any
   667  		switch {
   668  		case c.Fuzz != nil:
   669  			resp = ws.fuzz(ctx, *c.Fuzz)
   670  		case c.Minimize != nil:
   671  			resp = ws.minimize(ctx, *c.Minimize)
   672  		case c.Ping != nil:
   673  			resp = ws.ping(ctx, *c.Ping)
   674  		default:
   675  			return errors.New("no arguments provided for any call")
   676  		}
   677  
   678  		if err := enc.Encode(resp); err != nil {
   679  			return err
   680  		}
   681  	}
   682  }
   683  
   684  // chainedMutations is how many mutations are applied before the worker
   685  // resets the input to it's original state.
   686  // NOTE: this number was picked without much thought. It is low enough that
   687  // it seems to create a significant diversity in mutated inputs. We may want
   688  // to consider looking into this more closely once we have a proper performance
   689  // testing framework. Another option is to randomly pick the number of chained
   690  // mutations on each invocation of the workerServer.fuzz method (this appears to
   691  // be what libFuzzer does, although there seems to be no documentation which
   692  // explains why this choice was made.)
   693  const chainedMutations = 5
   694  
   695  // fuzz runs the test function on random variations of the input value in shared
   696  // memory for a limited duration or number of iterations.
   697  //
   698  // fuzz returns early if it finds an input that crashes the fuzz function (with
   699  // fuzzResponse.Err set) or an input that expands coverage (with
   700  // fuzzResponse.InterestingDuration set).
   701  //
   702  // fuzz does not modify the input in shared memory. Instead, it saves the
   703  // initial PRNG state in shared memory and increments a counter in shared
   704  // memory before each call to the test function. The caller may reconstruct
   705  // the crashing input with this information, since the PRNG is deterministic.
   706  func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) {
   707  	if args.CoverageData != nil {
   708  		if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) {
   709  			resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask))
   710  			return resp
   711  		}
   712  		ws.coverageMask = args.CoverageData
   713  	}
   714  	start := time.Now()
   715  	defer func() { resp.TotalDuration = time.Since(start) }()
   716  
   717  	if args.Timeout != 0 {
   718  		var cancel func()
   719  		ctx, cancel = context.WithTimeout(ctx, args.Timeout)
   720  		defer cancel()
   721  	}
   722  	mem := <-ws.memMu
   723  	ws.m.r.save(&mem.header().randState, &mem.header().randInc)
   724  	defer func() {
   725  		resp.Count = mem.header().count
   726  		ws.memMu <- mem
   727  	}()
   728  	if args.Limit > 0 && mem.header().count >= args.Limit {
   729  		resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit)
   730  		return resp
   731  	}
   732  
   733  	originalVals, err := unmarshalCorpusFile(mem.valueCopy())
   734  	if err != nil {
   735  		resp.InternalErr = err.Error()
   736  		return resp
   737  	}
   738  	vals := make([]any, len(originalVals))
   739  	copy(vals, originalVals)
   740  
   741  	shouldStop := func() bool {
   742  		return args.Limit > 0 && mem.header().count >= args.Limit
   743  	}
   744  	fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) {
   745  		mem.header().count++
   746  		var err error
   747  		dur, err = ws.fuzzFn(entry)
   748  		if err != nil {
   749  			errMsg = err.Error()
   750  			if errMsg == "" {
   751  				errMsg = "fuzz function failed with no input"
   752  			}
   753  			return dur, nil, errMsg
   754  		}
   755  		if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 {
   756  			return dur, coverageSnapshot, ""
   757  		}
   758  		return dur, nil, ""
   759  	}
   760  
   761  	if args.Warmup {
   762  		dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals})
   763  		if errMsg != "" {
   764  			resp.Err = errMsg
   765  			return resp
   766  		}
   767  		resp.InterestingDuration = dur
   768  		if coverageEnabled {
   769  			resp.CoverageData = coverageSnapshot
   770  		}
   771  		return resp
   772  	}
   773  
   774  	for {
   775  		select {
   776  		case <-ctx.Done():
   777  			return resp
   778  		default:
   779  			if mem.header().count%chainedMutations == 0 {
   780  				copy(vals, originalVals)
   781  				ws.m.r.save(&mem.header().randState, &mem.header().randInc)
   782  			}
   783  			ws.m.mutate(vals, cap(mem.valueRef()))
   784  
   785  			entry := CorpusEntry{Values: vals}
   786  			dur, cov, errMsg := fuzzOnce(entry)
   787  			if errMsg != "" {
   788  				resp.Err = errMsg
   789  				return resp
   790  			}
   791  			if cov != nil {
   792  				resp.CoverageData = cov
   793  				resp.InterestingDuration = dur
   794  				return resp
   795  			}
   796  			if shouldStop() {
   797  				return resp
   798  			}
   799  		}
   800  	}
   801  }
   802  
   803  func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) {
   804  	start := time.Now()
   805  	defer func() { resp.Duration = time.Since(start) }()
   806  	mem := <-ws.memMu
   807  	defer func() { ws.memMu <- mem }()
   808  	vals, err := unmarshalCorpusFile(mem.valueCopy())
   809  	if err != nil {
   810  		panic(err)
   811  	}
   812  	inpHash := sha256.Sum256(mem.valueCopy())
   813  	if args.Timeout != 0 {
   814  		var cancel func()
   815  		ctx, cancel = context.WithTimeout(ctx, args.Timeout)
   816  		defer cancel()
   817  	}
   818  
   819  	// Minimize the values in vals, then write to shared memory. We only write
   820  	// to shared memory after completing minimization.
   821  	success, err := ws.minimizeInput(ctx, vals, mem, args)
   822  	if success {
   823  		writeToMem(vals, mem)
   824  		outHash := sha256.Sum256(mem.valueCopy())
   825  		mem.header().rawInMem = false
   826  		resp.WroteToMem = true
   827  		if err != nil {
   828  			resp.Err = err.Error()
   829  		} else {
   830  			// If the values didn't change during minimization then coverageSnapshot is likely
   831  			// a dirty snapshot which represents the very last step of minimization, not the
   832  			// coverage for the initial input. In that case just return the coverage we were
   833  			// given initially, since it more accurately represents the coverage map for the
   834  			// input we are returning.
   835  			if outHash != inpHash {
   836  				resp.CoverageData = coverageSnapshot
   837  			} else {
   838  				resp.CoverageData = args.KeepCoverage
   839  			}
   840  		}
   841  	}
   842  	return resp
   843  }
   844  
   845  // minimizeInput applies a series of minimizing transformations on the provided
   846  // vals, ensuring that each minimization still causes an error, or keeps
   847  // coverage, in fuzzFn. It uses the context to determine how long to run,
   848  // stopping once closed. It returns a bool indicating whether minimization was
   849  // successful and an error if one was found.
   850  func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) {
   851  	keepCoverage := args.KeepCoverage
   852  	memBytes := mem.valueRef()
   853  	bPtr := &memBytes
   854  	count := &mem.header().count
   855  	shouldStop := func() bool {
   856  		return ctx.Err() != nil ||
   857  			(args.Limit > 0 && *count >= args.Limit)
   858  	}
   859  	if shouldStop() {
   860  		return false, nil
   861  	}
   862  
   863  	// Check that the original value preserves coverage or causes an error.
   864  	// If not, then whatever caused us to think the value was interesting may
   865  	// have been a flake, and we can't minimize it.
   866  	*count++
   867  	_, retErr = ws.fuzzFn(CorpusEntry{Values: vals})
   868  	if keepCoverage != nil {
   869  		if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil {
   870  			return false, nil
   871  		}
   872  	} else if retErr == nil {
   873  		return false, nil
   874  	}
   875  	mem.header().rawInMem = true
   876  
   877  	// tryMinimized runs the fuzz function with candidate replacing the value
   878  	// at index valI. tryMinimized returns whether the input with candidate is
   879  	// interesting for the same reason as the original input: it returns
   880  	// an error if one was expected, or it preserves coverage.
   881  	tryMinimized := func(candidate []byte) bool {
   882  		prev := vals[args.Index]
   883  		switch prev.(type) {
   884  		case []byte:
   885  			vals[args.Index] = candidate
   886  		case string:
   887  			vals[args.Index] = string(candidate)
   888  		default:
   889  			panic("impossible")
   890  		}
   891  		copy(*bPtr, candidate)
   892  		*bPtr = (*bPtr)[:len(candidate)]
   893  		mem.setValueLen(len(candidate))
   894  		*count++
   895  		_, err := ws.fuzzFn(CorpusEntry{Values: vals})
   896  		if err != nil {
   897  			retErr = err
   898  			if keepCoverage != nil {
   899  				// Now that we've found a crash, that's more important than any
   900  				// minimization of interesting inputs that was being done. Clear out
   901  				// keepCoverage to only minimize the crash going forward.
   902  				keepCoverage = nil
   903  			}
   904  			return true
   905  		}
   906  		// Minimization should preserve coverage bits.
   907  		if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) {
   908  			return true
   909  		}
   910  		vals[args.Index] = prev
   911  		return false
   912  	}
   913  	switch v := vals[args.Index].(type) {
   914  	case string:
   915  		minimizeBytes([]byte(v), tryMinimized, shouldStop)
   916  	case []byte:
   917  		minimizeBytes(v, tryMinimized, shouldStop)
   918  	default:
   919  		panic("impossible")
   920  	}
   921  	return true, retErr
   922  }
   923  
   924  func writeToMem(vals []any, mem *sharedMem) {
   925  	b := marshalCorpusFile(vals...)
   926  	mem.setValue(b)
   927  }
   928  
   929  // ping does nothing. The coordinator calls this method to ensure the worker
   930  // has called F.Fuzz and can communicate.
   931  func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse {
   932  	return pingResponse{}
   933  }
   934  
   935  // workerClient is a minimalist RPC client. The coordinator process uses a
   936  // workerClient to call methods in each worker process (handled by
   937  // workerServer).
   938  type workerClient struct {
   939  	workerComm
   940  	m *mutator
   941  
   942  	// mu is the mutex protecting the workerComm.fuzzIn pipe. This must be
   943  	// locked before making calls to the workerServer. It prevents
   944  	// workerClient.Close from closing fuzzIn while workerClient methods are
   945  	// writing to it concurrently, and prevents multiple callers from writing to
   946  	// fuzzIn concurrently.
   947  	mu sync.Mutex
   948  }
   949  
   950  func newWorkerClient(comm workerComm, m *mutator) *workerClient {
   951  	return &workerClient{workerComm: comm, m: m}
   952  }
   953  
   954  // Close shuts down the connection to the RPC server (the worker process) by
   955  // closing fuzz_in. Close drains fuzz_out (avoiding a SIGPIPE in the worker),
   956  // and closes it after the worker process closes the other end.
   957  func (wc *workerClient) Close() error {
   958  	wc.mu.Lock()
   959  	defer wc.mu.Unlock()
   960  
   961  	// Close fuzzIn. This signals to the server that there are no more calls,
   962  	// and it should exit.
   963  	if err := wc.fuzzIn.Close(); err != nil {
   964  		wc.fuzzOut.Close()
   965  		return err
   966  	}
   967  
   968  	// Drain fuzzOut and close it. When the server exits, the kernel will close
   969  	// its end of fuzzOut, and we'll get EOF.
   970  	if _, err := io.Copy(io.Discard, wc.fuzzOut); err != nil {
   971  		wc.fuzzOut.Close()
   972  		return err
   973  	}
   974  	return wc.fuzzOut.Close()
   975  }
   976  
   977  // errSharedMemClosed is returned by workerClient methods that cannot access
   978  // shared memory because it was closed and unmapped by another goroutine. That
   979  // can happen when worker.cleanup is called in the worker goroutine while a
   980  // workerClient.fuzz call runs concurrently.
   981  //
   982  // This error should not be reported. It indicates the operation was
   983  // interrupted.
   984  var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
   985  
   986  // minimize tells the worker to call the minimize method. See
   987  // workerServer.minimize.
   988  func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) {
   989  	wc.mu.Lock()
   990  	defer wc.mu.Unlock()
   991  
   992  	mem, ok := <-wc.memMu
   993  	if !ok {
   994  		return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
   995  	}
   996  	defer func() { wc.memMu <- mem }()
   997  	mem.header().count = 0
   998  	inp, err := corpusEntryData(entryIn)
   999  	if err != nil {
  1000  		return CorpusEntry{}, minimizeResponse{}, err
  1001  	}
  1002  	mem.setValue(inp)
  1003  	entryOut = entryIn
  1004  	entryOut.Values, err = unmarshalCorpusFile(inp)
  1005  	if err != nil {
  1006  		return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err)
  1007  	}
  1008  	for i, v := range entryOut.Values {
  1009  		if !isMinimizable(reflect.TypeOf(v)) {
  1010  			continue
  1011  		}
  1012  
  1013  		wc.memMu <- mem
  1014  		args.Index = i
  1015  		c := call{Minimize: &args}
  1016  		callErr := wc.callLocked(ctx, c, &resp)
  1017  		mem, ok = <-wc.memMu
  1018  		if !ok {
  1019  			return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
  1020  		}
  1021  
  1022  		if callErr != nil {
  1023  			retErr = callErr
  1024  			if !mem.header().rawInMem {
  1025  				// An unrecoverable error occurred before minimization began.
  1026  				return entryIn, minimizeResponse{}, retErr
  1027  			}
  1028  			// An unrecoverable error occurred during minimization. mem now
  1029  			// holds the raw, unmarshalled bytes of entryIn.Values[i] that
  1030  			// caused the error.
  1031  			switch entryOut.Values[i].(type) {
  1032  			case string:
  1033  				entryOut.Values[i] = string(mem.valueCopy())
  1034  			case []byte:
  1035  				entryOut.Values[i] = mem.valueCopy()
  1036  			default:
  1037  				panic("impossible")
  1038  			}
  1039  			entryOut.Data = marshalCorpusFile(entryOut.Values...)
  1040  			// Stop minimizing; another unrecoverable error is likely to occur.
  1041  			break
  1042  		}
  1043  
  1044  		if resp.WroteToMem {
  1045  			// Minimization succeeded, and mem holds the marshaled data.
  1046  			entryOut.Data = mem.valueCopy()
  1047  			entryOut.Values, err = unmarshalCorpusFile(entryOut.Data)
  1048  			if err != nil {
  1049  				return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err)
  1050  			}
  1051  		}
  1052  
  1053  		// Prepare for next iteration of the loop.
  1054  		if args.Timeout != 0 {
  1055  			args.Timeout -= resp.Duration
  1056  			if args.Timeout <= 0 {
  1057  				break
  1058  			}
  1059  		}
  1060  		if args.Limit != 0 {
  1061  			args.Limit -= mem.header().count
  1062  			if args.Limit <= 0 {
  1063  				break
  1064  			}
  1065  		}
  1066  	}
  1067  	resp.Count = mem.header().count
  1068  	h := sha256.Sum256(entryOut.Data)
  1069  	entryOut.Path = fmt.Sprintf("%x", h[:4])
  1070  	return entryOut, resp, retErr
  1071  }
  1072  
  1073  // fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
  1074  func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) {
  1075  	wc.mu.Lock()
  1076  	defer wc.mu.Unlock()
  1077  
  1078  	mem, ok := <-wc.memMu
  1079  	if !ok {
  1080  		return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
  1081  	}
  1082  	mem.header().count = 0
  1083  	inp, err := corpusEntryData(entryIn)
  1084  	if err != nil {
  1085  		wc.memMu <- mem
  1086  		return CorpusEntry{}, fuzzResponse{}, true, err
  1087  	}
  1088  	mem.setValue(inp)
  1089  	wc.memMu <- mem
  1090  
  1091  	c := call{Fuzz: &args}
  1092  	callErr := wc.callLocked(ctx, c, &resp)
  1093  	if resp.InternalErr != "" {
  1094  		return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr)
  1095  	}
  1096  	mem, ok = <-wc.memMu
  1097  	if !ok {
  1098  		return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
  1099  	}
  1100  	defer func() { wc.memMu <- mem }()
  1101  	resp.Count = mem.header().count
  1102  
  1103  	if !bytes.Equal(inp, mem.valueRef()) {
  1104  		return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input")
  1105  	}
  1106  	needEntryOut := callErr != nil || resp.Err != "" ||
  1107  		(!args.Warmup && resp.CoverageData != nil)
  1108  	if needEntryOut {
  1109  		valuesOut, err := unmarshalCorpusFile(inp)
  1110  		if err != nil {
  1111  			return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err)
  1112  		}
  1113  		wc.m.r.restore(mem.header().randState, mem.header().randInc)
  1114  		if !args.Warmup {
  1115  			// Only mutate the valuesOut if fuzzing actually occurred.
  1116  			numMutations := ((resp.Count - 1) % chainedMutations) + 1
  1117  			for i := int64(0); i < numMutations; i++ {
  1118  				wc.m.mutate(valuesOut, cap(mem.valueRef()))
  1119  			}
  1120  		}
  1121  		dataOut := marshalCorpusFile(valuesOut...)
  1122  
  1123  		h := sha256.Sum256(dataOut)
  1124  		name := fmt.Sprintf("%x", h[:4])
  1125  		entryOut = CorpusEntry{
  1126  			Parent:     entryIn.Path,
  1127  			Path:       name,
  1128  			Data:       dataOut,
  1129  			Generation: entryIn.Generation + 1,
  1130  		}
  1131  		if args.Warmup {
  1132  			// The bytes weren't mutated, so if entryIn was a seed corpus value,
  1133  			// then entryOut is too.
  1134  			entryOut.IsSeed = entryIn.IsSeed
  1135  		}
  1136  	}
  1137  
  1138  	return entryOut, resp, false, callErr
  1139  }
  1140  
  1141  // ping tells the worker to call the ping method. See workerServer.ping.
  1142  func (wc *workerClient) ping(ctx context.Context) error {
  1143  	wc.mu.Lock()
  1144  	defer wc.mu.Unlock()
  1145  	c := call{Ping: &pingArgs{}}
  1146  	var resp pingResponse
  1147  	return wc.callLocked(ctx, c, &resp)
  1148  }
  1149  
  1150  // callLocked sends an RPC from the coordinator to the worker process and waits
  1151  // for the response. The callLocked may be cancelled with ctx.
  1152  func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) {
  1153  	enc := json.NewEncoder(wc.fuzzIn)
  1154  	dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
  1155  	if err := enc.Encode(c); err != nil {
  1156  		return err
  1157  	}
  1158  	return dec.Decode(resp)
  1159  }
  1160  
  1161  // contextReader wraps a Reader with a Context. If the context is cancelled
  1162  // while the underlying reader is blocked, Read returns immediately.
  1163  //
  1164  // This is useful for reading from a pipe. Closing a pipe file descriptor does
  1165  // not unblock pending Reads on that file descriptor. All copies of the pipe's
  1166  // other file descriptor (the write end) must be closed in all processes that
  1167  // inherit it. This is difficult to do correctly in the situation we care about
  1168  // (process group termination).
  1169  type contextReader struct {
  1170  	ctx context.Context
  1171  	r   io.Reader
  1172  }
  1173  
  1174  func (cr *contextReader) Read(b []byte) (int, error) {
  1175  	if ctxErr := cr.ctx.Err(); ctxErr != nil {
  1176  		return 0, ctxErr
  1177  	}
  1178  	done := make(chan struct{})
  1179  
  1180  	// This goroutine may stay blocked after Read returns because the underlying
  1181  	// read is blocked.
  1182  	var n int
  1183  	var err error
  1184  	go func() {
  1185  		n, err = cr.r.Read(b)
  1186  		close(done)
  1187  	}()
  1188  
  1189  	select {
  1190  	case <-cr.ctx.Done():
  1191  		return 0, cr.ctx.Err()
  1192  	case <-done:
  1193  		return n, err
  1194  	}
  1195  }
  1196  

View as plain text