Source file src/internal/trace/order.go

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"fmt"
     9  	"sort"
    10  )
    11  
    12  type eventBatch struct {
    13  	events   []*Event
    14  	selected bool
    15  }
    16  
    17  type orderEvent struct {
    18  	ev    *Event
    19  	batch int
    20  	g     uint64
    21  	init  gState
    22  	next  gState
    23  }
    24  
    25  type gStatus int
    26  
    27  type gState struct {
    28  	seq    uint64
    29  	status gStatus
    30  }
    31  
    32  const (
    33  	gDead gStatus = iota
    34  	gRunnable
    35  	gRunning
    36  	gWaiting
    37  
    38  	unordered = ^uint64(0)
    39  	garbage   = ^uint64(0) - 1
    40  	noseq     = ^uint64(0)
    41  	seqinc    = ^uint64(0) - 1
    42  )
    43  
    44  // order1007 merges a set of per-P event batches into a single, consistent stream.
    45  // The high level idea is as follows. Events within an individual batch are in
    46  // correct order, because they are emitted by a single P. So we need to produce
    47  // a correct interleaving of the batches. To do this we take first unmerged event
    48  // from each batch (frontier). Then choose subset that is "ready" to be merged,
    49  // that is, events for which all dependencies are already merged. Then we choose
    50  // event with the lowest timestamp from the subset, merge it and repeat.
    51  // This approach ensures that we form a consistent stream even if timestamps are
    52  // incorrect (condition observed on some machines).
    53  func order1007(m map[int][]*Event) (events []*Event, err error) {
    54  	pending := 0
    55  	// The ordering of CPU profile sample events in the data stream is based on
    56  	// when each run of the signal handler was able to acquire the spinlock,
    57  	// with original timestamps corresponding to when ReadTrace pulled the data
    58  	// off of the profBuf queue. Re-sort them by the timestamp we captured
    59  	// inside the signal handler.
    60  	sort.Stable(eventList(m[ProfileP]))
    61  	var batches []*eventBatch
    62  	for _, v := range m {
    63  		pending += len(v)
    64  		batches = append(batches, &eventBatch{v, false})
    65  	}
    66  	gs := make(map[uint64]gState)
    67  	var frontier []orderEvent
    68  	for ; pending != 0; pending-- {
    69  		for i, b := range batches {
    70  			if b.selected || len(b.events) == 0 {
    71  				continue
    72  			}
    73  			ev := b.events[0]
    74  			g, init, next := stateTransition(ev)
    75  			if !transitionReady(g, gs[g], init) {
    76  				continue
    77  			}
    78  			frontier = append(frontier, orderEvent{ev, i, g, init, next})
    79  			b.events = b.events[1:]
    80  			b.selected = true
    81  			// Get rid of "Local" events, they are intended merely for ordering.
    82  			switch ev.Type {
    83  			case EvGoStartLocal:
    84  				ev.Type = EvGoStart
    85  			case EvGoUnblockLocal:
    86  				ev.Type = EvGoUnblock
    87  			case EvGoSysExitLocal:
    88  				ev.Type = EvGoSysExit
    89  			}
    90  		}
    91  		if len(frontier) == 0 {
    92  			return nil, fmt.Errorf("no consistent ordering of events possible")
    93  		}
    94  		sort.Sort(orderEventList(frontier))
    95  		f := frontier[0]
    96  		frontier[0] = frontier[len(frontier)-1]
    97  		frontier = frontier[:len(frontier)-1]
    98  		events = append(events, f.ev)
    99  		transition(gs, f.g, f.init, f.next)
   100  		if !batches[f.batch].selected {
   101  			panic("frontier batch is not selected")
   102  		}
   103  		batches[f.batch].selected = false
   104  	}
   105  
   106  	// At this point we have a consistent stream of events.
   107  	// Make sure time stamps respect the ordering.
   108  	// The tests will skip (not fail) the test case if they see this error.
   109  	if !sort.IsSorted(eventList(events)) {
   110  		return nil, ErrTimeOrder
   111  	}
   112  
   113  	// The last part is giving correct timestamps to EvGoSysExit events.
   114  	// The problem with EvGoSysExit is that actual syscall exit timestamp (ev.Args[2])
   115  	// is potentially acquired long before event emission. So far we've used
   116  	// timestamp of event emission (ev.Ts).
   117  	// We could not set ev.Ts = ev.Args[2] earlier, because it would produce
   118  	// seemingly broken timestamps (misplaced event).
   119  	// We also can't simply update the timestamp and resort events, because
   120  	// if timestamps are broken we will misplace the event and later report
   121  	// logically broken trace (instead of reporting broken timestamps).
   122  	lastSysBlock := make(map[uint64]int64)
   123  	for _, ev := range events {
   124  		switch ev.Type {
   125  		case EvGoSysBlock, EvGoInSyscall:
   126  			lastSysBlock[ev.G] = ev.Ts
   127  		case EvGoSysExit:
   128  			ts := int64(ev.Args[2])
   129  			if ts == 0 {
   130  				continue
   131  			}
   132  			block := lastSysBlock[ev.G]
   133  			if block == 0 {
   134  				return nil, fmt.Errorf("stray syscall exit")
   135  			}
   136  			if ts < block {
   137  				return nil, ErrTimeOrder
   138  			}
   139  			ev.Ts = ts
   140  		}
   141  	}
   142  	sort.Stable(eventList(events))
   143  
   144  	return
   145  }
   146  
   147  // stateTransition returns goroutine state (sequence and status) when the event
   148  // becomes ready for merging (init) and the goroutine state after the event (next).
   149  func stateTransition(ev *Event) (g uint64, init, next gState) {
   150  	switch ev.Type {
   151  	case EvGoCreate:
   152  		g = ev.Args[0]
   153  		init = gState{0, gDead}
   154  		next = gState{1, gRunnable}
   155  	case EvGoWaiting, EvGoInSyscall:
   156  		g = ev.G
   157  		init = gState{1, gRunnable}
   158  		next = gState{2, gWaiting}
   159  	case EvGoStart, EvGoStartLabel:
   160  		g = ev.G
   161  		init = gState{ev.Args[1], gRunnable}
   162  		next = gState{ev.Args[1] + 1, gRunning}
   163  	case EvGoStartLocal:
   164  		// noseq means that this event is ready for merging as soon as
   165  		// frontier reaches it (EvGoStartLocal is emitted on the same P
   166  		// as the corresponding EvGoCreate/EvGoUnblock, and thus the latter
   167  		// is already merged).
   168  		// seqinc is a stub for cases when event increments g sequence,
   169  		// but since we don't know current seq we also don't know next seq.
   170  		g = ev.G
   171  		init = gState{noseq, gRunnable}
   172  		next = gState{seqinc, gRunning}
   173  	case EvGoBlock, EvGoBlockSend, EvGoBlockRecv, EvGoBlockSelect,
   174  		EvGoBlockSync, EvGoBlockCond, EvGoBlockNet, EvGoSleep,
   175  		EvGoSysBlock, EvGoBlockGC:
   176  		g = ev.G
   177  		init = gState{noseq, gRunning}
   178  		next = gState{noseq, gWaiting}
   179  	case EvGoSched, EvGoPreempt:
   180  		g = ev.G
   181  		init = gState{noseq, gRunning}
   182  		next = gState{noseq, gRunnable}
   183  	case EvGoUnblock, EvGoSysExit:
   184  		g = ev.Args[0]
   185  		init = gState{ev.Args[1], gWaiting}
   186  		next = gState{ev.Args[1] + 1, gRunnable}
   187  	case EvGoUnblockLocal, EvGoSysExitLocal:
   188  		g = ev.Args[0]
   189  		init = gState{noseq, gWaiting}
   190  		next = gState{seqinc, gRunnable}
   191  	case EvGCStart:
   192  		g = garbage
   193  		init = gState{ev.Args[0], gDead}
   194  		next = gState{ev.Args[0] + 1, gDead}
   195  	default:
   196  		// no ordering requirements
   197  		g = unordered
   198  	}
   199  	return
   200  }
   201  
   202  func transitionReady(g uint64, curr, init gState) bool {
   203  	return g == unordered || (init.seq == noseq || init.seq == curr.seq) && init.status == curr.status
   204  }
   205  
   206  func transition(gs map[uint64]gState, g uint64, init, next gState) {
   207  	if g == unordered {
   208  		return
   209  	}
   210  	curr := gs[g]
   211  	if !transitionReady(g, curr, init) {
   212  		panic("event sequences are broken")
   213  	}
   214  	switch next.seq {
   215  	case noseq:
   216  		next.seq = curr.seq
   217  	case seqinc:
   218  		next.seq = curr.seq + 1
   219  	}
   220  	gs[g] = next
   221  }
   222  
   223  // order1005 merges a set of per-P event batches into a single, consistent stream.
   224  func order1005(m map[int][]*Event) (events []*Event, err error) {
   225  	for _, batch := range m {
   226  		events = append(events, batch...)
   227  	}
   228  	for _, ev := range events {
   229  		if ev.Type == EvGoSysExit {
   230  			// EvGoSysExit emission is delayed until the thread has a P.
   231  			// Give it the real sequence number and time stamp.
   232  			ev.seq = int64(ev.Args[1])
   233  			if ev.Args[2] != 0 {
   234  				ev.Ts = int64(ev.Args[2])
   235  			}
   236  		}
   237  	}
   238  	sort.Sort(eventSeqList(events))
   239  	if !sort.IsSorted(eventList(events)) {
   240  		return nil, ErrTimeOrder
   241  	}
   242  	return
   243  }
   244  
   245  type orderEventList []orderEvent
   246  
   247  func (l orderEventList) Len() int {
   248  	return len(l)
   249  }
   250  
   251  func (l orderEventList) Less(i, j int) bool {
   252  	return l[i].ev.Ts < l[j].ev.Ts
   253  }
   254  
   255  func (l orderEventList) Swap(i, j int) {
   256  	l[i], l[j] = l[j], l[i]
   257  }
   258  
   259  type eventList []*Event
   260  
   261  func (l eventList) Len() int {
   262  	return len(l)
   263  }
   264  
   265  func (l eventList) Less(i, j int) bool {
   266  	return l[i].Ts < l[j].Ts
   267  }
   268  
   269  func (l eventList) Swap(i, j int) {
   270  	l[i], l[j] = l[j], l[i]
   271  }
   272  
   273  type eventSeqList []*Event
   274  
   275  func (l eventSeqList) Len() int {
   276  	return len(l)
   277  }
   278  
   279  func (l eventSeqList) Less(i, j int) bool {
   280  	return l[i].seq < l[j].seq
   281  }
   282  
   283  func (l eventSeqList) Swap(i, j int) {
   284  	l[i], l[j] = l[j], l[i]
   285  }
   286  

View as plain text