为了账号安全,请及时绑定邮箱和手机立即绑定

为什么我照着敲的,去掉sleep后,它监听哪个9193端口,但好像不是阻塞式的,执行go run log_process.go立刻就退出了

https://img1.sycdn.imooc.com//5af2fb980001a2f225341520.jpg


为什么我照着敲的,去掉sleep后,它监听哪个9193端口,但好像不是阻塞式的,执行go run log_process.go

正在回答

2 回答

大神,上面那个是log_process.go的代码

0 回复 有任何疑惑可以回复我~
#1

麦可同学

我刚测试了下,没有问题的,,你是什么环境?你可以写一个最小化的测试代码试试,分别做下排查。
2018-05-10 回复 有任何疑惑可以回复我~
#2

坐着看太阳敲代码 提问者 回复 麦可同学

这么奇怪,是在mac配置的go环境。那我自己写个测试代码,看看行不行。
2018-05-13 回复 有任何疑惑可以回复我~
package main

import (
   "strings"
   "fmt"
   "time"
   "os"
   "bufio"
   "io"
   "regexp"
   "log"
   "strconv"
   "net/url"
   "flag"
   "net/http"
   //"influxdb-master/client/v2"
   "github.com/influxdata/influxdb/client/v2"

   "encoding/json"
)

// 系统状态监控
type SystemInfo struct {
   HandleLine   int     `json:"handleLine"`   // 总处理日志行数
   Tps          float64 `json:"tps"`          // 系统吞出量
   ReadChanLen  int     `json:"readChanLen"`  // read channel 长度
   WriteChanLen int     `json:"writeChanLen"` // write channel 长度
   RunTime      string  `json:"runTime"`      // 运行总时间
   ErrNum       int     `json:"errNum"`       // 错误数
}

const (
   TypeHandleLine = 0
   TypeErrNum     = 1
)

var TypeMonitorChan = make(chan int, 200)

type Monitor struct {
   startTime time.Time
   data      SystemInfo
   tpsSli    []int
}

func (m *Monitor) start(lp *LogProcess) {

   go func() {
      for n := range TypeMonitorChan {
         switch n {
         case TypeErrNum:
            m.data.ErrNum += 1
         case TypeHandleLine:
            m.data.HandleLine += 1
         }
      }
   }()

   ticker := time.NewTicker(time.Second * 5)
   go func() { //协程
      for {
         <-ticker.C
         m.tpsSli = append(m.tpsSli, m.data.HandleLine)
         if len(m.tpsSli) > 2 {
            m.tpsSli = m.tpsSli[1:]
         }
      }
   }()

   http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
      m.data.RunTime = time.Now().Sub(m.startTime).String()
      m.data.ReadChanLen = len(lp.rc)
      m.data.WriteChanLen = len(lp.wc)

      if len(m.tpsSli) >= 2 {
         m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5
      }

      ret, _ := json.MarshalIndent(m.data, "", "\t")
      io.WriteString(writer, string(ret))
   })

   http.ListenAndServe(":9193", nil)
}

type Reader interface {
   Read(rc chan []byte)
}
type Writer interface {
   Write(wc chan *Message)
}
type LogProcess struct {
   rc chan []byte
   wc chan *Message
   //path        string //读取文件的路径
   //influxDBDsn string //influx data source
   read  Reader
   write Writer
}

type ReadFromToInFile struct {
   path string //获取文件路径
}
type WriteFromTofluxDB struct {
   influxDBDsn string //influx data source
}

type Message struct {
   TimeLocal                    time.Time
   BytesSent                    int
   Path, Method, Scheme, Status string
   UpstreamTime, RequestTime    float64
}

func (r *ReadFromToInFile) Read(rc chan []byte) {
   //读取模块
   //打开文件

   f, err := os.Open(r.path) //返回file类型的结构体指针,因为这个结构体只能逐个读取这个字符,所以下面会换成newreader

   if err != nil {
      panic(fmt.Sprint("open file error:%s", err.Error()))
   }

   //从文件末尾开始逐行读取文件内容
   f.Seek(0, 2)             //把字符指针移动到末尾,参数2就是移动到末尾的意思
   rd := bufio.NewReader(f) //这样它会返回新的NewReader的类型指针,以至于可以使用更多的方法,逐行读取

   for {
      line, err := rd.ReadBytes('\n') //逐行读取;又或者是读取直到遇见换行符
      if err == io.EOF { //EOf代表是文件末尾的错误
         time.Sleep(500 * time.Millisecond)
         continue
      } else if err != nil {
         panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
      }
      //print(line)
      TypeMonitorChan <- TypeHandleLine
      rc <- line[:len(line)-1]
   }

   //rc<-line
   //
   //line := "message"
   //rc <- line
}
func (w *WriteFromTofluxDB) Write(wc chan *Message) {
   //写入模块

   infSli := strings.Split(w.influxDBDsn, "@")
   c, err := client.NewHTTPClient(client.HTTPConfig{
      Addr:     infSli[0],
      Username: infSli[1],
      Password: infSli[2],
   })
   if err != nil {
      log.Fatal(err)
   }
   defer c.Close()

   // Create a new point batch
   bp, err := client.NewBatchPoints(client.BatchPointsConfig{
      Database:  infSli[3],
      Precision: infSli[4],
   })
   if err != nil {
      log.Fatal(err)
   }

   //fmt.Println(<-wc)
   for v := range wc {
      // Create a point and add to batch
      tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
      fields := map[string]interface{}{
         "UpstreamTime": v.UpstreamTime,
         "RequestTime":  v.RequestTime,
         "BytesSent":    v.BytesSent,
      }

      pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
      if err != nil {
         log.Fatal(err)
      }
      bp.AddPoint(pt)

      // Write the batch
      if err := c.Write(bp); err != nil {
         log.Fatal(err)
      }
      log.Println("write success")
      // Close client resources
      //if err := c.Close(); err != nil {
      // log.Fatal(err)
      //}
      //fmt.Println(v)
   }

}

//func (l *LogProcess) ReadFromFile() {
// //读取模块
// line := "message"
// l.rc <- line
//}

func (l *LogProcess) Proccess() {
   // 解析模块

   /**
   172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
   */

   r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)

   loc, _ := time.LoadLocation("Asia/Shanghai")
   for v := range l.rc {
      ret := r.FindStringSubmatch(string(v))

      //log.Println(ret, len(ret))
      if len(ret) != 14 {
         TypeMonitorChan <- TypeErrNum
         log.Println("FindStringSubmatch fail:", string(v))
         continue
      }

      message := &Message{}
      t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
      if err != nil {
         TypeMonitorChan <- TypeErrNum
         log.Println("ParseInLocation fail:", err.Error(), ret[4])
         continue
      }
      message.TimeLocal = t

      byteSent, _ := strconv.Atoi(ret[8])
      message.BytesSent = byteSent

      // GET /foo?query=t HTTP/1.0
      reqSli := strings.Split(ret[6], " ")
      if len(reqSli) != 3 {
         TypeMonitorChan <- TypeErrNum
         log.Println("strings.Split fail", ret[6])
         continue
      }
      message.Method = reqSli[0]

      u, err := url.Parse(reqSli[1])
      if err != nil {
         log.Println("url parse fail:", err)
         TypeMonitorChan <- TypeErrNum
         continue
      }
      message.Path = u.Path

      message.Scheme = ret[5]
      message.Status = ret[7]

      upstreamTime, _ := strconv.ParseFloat(ret[12], 64)
      requestTime, _ := strconv.ParseFloat(ret[13], 64)
      message.UpstreamTime = upstreamTime
      message.RequestTime = requestTime

      l.wc <- message
   }
}

//func (l *LogProcess) WriteToInfluxDB() {
// //写入模块
// fmt.Println(<-l.wc)
// fmt.Println(<-l.wc)
//}

func main() {
   var path, influxDsn string
   flag.StringVar(&path, "path", "./access.log", "read file path")

   flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@root@101@imooc@s", "influx data source")
   flag.Parse()

   r := &ReadFromToInFile{
      path: path,
   }
   w := &WriteFromTofluxDB{
      influxDBDsn: influxDsn,
   }
   lp := &LogProcess{
      rc:    make(chan []byte, 200),
      wc:    make(chan *Message, 200),
      read:  r,
      write: w,
   }
   //go lp.ReadFromFile() //lp是引用类型,出于性能考虑这样做。本来这里应该这样写 go (*lp).ReadFromFile(),
   //// 但goland编辑器对此优化,写成go lp.ReadFromFile(),也能达到上面一样的效果。知道他是个结构体。更利于阅读
   //go lp.Proccess()
   //go lp.WriteToInfluxDB()
   //time.Sleep(1 * time.Second)

   go lp.read.Read(lp.rc) //lp是引用类型,出于性能考虑这样做。本来这里应该这样写 go (*lp).ReadFromFile(),
   // 但goland编辑器对此优化,写成go lp.ReadFromFile(),也能达到上面一样的效果。知道他是个结构体。更利于阅读
   for i := 0; i < 2; i++ {
      go lp.Proccess()
   }
   for i := 0; i < 4; i++ {
      go lp.write.Write(lp.wc)
   }

   //
   //m := &Monitor{
   // startTime: time.Now(),
   // data:      SystemInfo{},
   //}
   //m.start(lp)

   m := &Monitor{
      startTime: time.Now(),
      data:      SystemInfo{},
   }
   m.start(lp)
   //time.Sleep(30 * time.Second)
}
1 回复 有任何疑惑可以回复我~

举报

0/150
提交
取消

为什么我照着敲的,去掉sleep后,它监听哪个9193端口,但好像不是阻塞式的,执行go run log_process.go立刻就退出了

我要回答 关注问题
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号