-
Golang 并發實現
gorountine channels select
查看全部 -
常見并發模型
1,進程和線程模型,例如apache
2,異步非阻塞,例如nginx,nodejs
3,協程,例如golang,erlang,lua
查看全部 -
package?main import?( ???"strings" ???"fmt" ???"time" ???"os" ???"bufio" ???"io" ???"regexp" ???"log" ???"strconv" ???"net/url" ???"github.com/influxdata/influxdb/client/v2" ???"flag" ???"net/http" ???"encoding/json" ) type?Reader?interface?{ ???Read(rc?chan?[]byte) } type?Writer?interface?{ ???Write(wc?chan?*Message) } type?ReadFromFile?struct?{ ???path?string?//log?file?path } type?WriteToInfluxDB?struct?{ ???influxDBDsn?string?//influx?data?source } type?Message?struct?{ ???TimeLocal?time.Time ???BytesSent?int ???Path,?Method,?Scheme,?Status?string ???UpstreamTime,?RequestTime?float64 } type?LogProcess?struct?{ ???rc?chan?[]byte?//read?channel ???wc?chan?*Message?//write?channel ???read?Reader ???write?Writer } type?SystemInfo?struct?{ ???HandleLine?int?`json:"handleLine"`?//總處理日志行數 ???Tps?float64?`json:"tps"`?//系統吞吐量 ???ReadChanlen?int?`json:"readChanlen"`?//讀信道長度 ???WriteChanlen?int?`json:"writeChanlen"`?//寫信道長度 ???RunTime?string?`json:"runTime"`?//總運行時間 ???ErrNum?int?`json:"errNum"`?//錯誤數 } type?Monitor?struct?{ ???startTime?time.Time ???data?SystemInfo ???tpSli?[]int } const?( ???TypeHandleLine?=?0 ???TypeErrNum?=?1 ) var?TypeMonitorChan?=?make(chan?int,?200) func?(m?*Monitor)?start(lp?*LogProcess)?{ ???go?func()?{ ??????for?n?:=?range?TypeMonitorChan{ ?????????switch?n?{ ?????????case?TypeErrNum: ????????????m.data.ErrNum?+=?1 ?????????case?TypeHandleLine: ????????????m.data.HandleLine?+=?1 ?????????} ??????} ???}() ???ticker?:=?time.NewTicker(5*time.Second) ???go?func()?{ ??????for?{ ?????????<-ticker.C ?????????m.tpSli?=?append(m.tpSli,?m.data.HandleLine) ?????????if?len(m.tpSli)?>?2?{ ????????????m.tpSli?=?m.tpSli[1:] ?????????} ??????} ???}() ???http.HandleFunc("/monitor",?func(writer?http.ResponseWriter,?request?*http.Request)?{ ??????m.data.RunTime?=?time.Now().Sub(m.startTime).String() ??????m.data.ReadChanlen?=?len(lp.rc) ??????m.data.WriteChanlen?=?len(lp.wc) ??????if?len(m.tpSli)?>=?2?{ ?????????m.data.Tps?=?float64(m.tpSli[1]-m.tpSli[0])?/?5 ??????} ??????ret,?_?:=?json.MarshalIndent(m.data,?"",?"\t") ??????io.WriteString(writer,?string(ret)) ???}) ???http.ListenAndServe(":9193",?nil) } func?(r?*ReadFromFile)?Read(rc?chan?[]byte)?{ ???//log?reading?module ???//打開文件 ???f,?err?:=?os.Open(r.path) ???if?err!=?nil?{ ??????TypeMonitorChan?<-?TypeErrNum ??????panic(fmt.Sprintf("open?file?error:%s",?err.Error())) ???} ???//從文件末尾開始逐行讀取文件內容 ???f.Seek(0,?2) ???rd?:=?bufio.NewReader(f) ???for?{ ??????line,?err?:=?rd.ReadBytes('\n') ??????if?err?==?io.EOF?{ ?????????time.Sleep(500*time.Millisecond) ?????????continue ??????}?else?if?err?!=?nil?{ ?????????TypeMonitorChan?<-?TypeErrNum ?????????panic(fmt.Sprintf("ReadBytes?error:%s",?err.Error())) ??????} ??????TypeMonitorChan?<-?TypeHandleLine ??????rc?<-?line[:len(line)-1] ???} } func?(w?*WriteToInfluxDB)?Write(wc?chan?*Message)?{ ???//log?writing?module ???infSli?:=?strings.Split(w.influxDBDsn,?"@") ???//?Create?a?new?HTTPClient ???c,?err?:=?client.NewHTTPClient(client.HTTPConfig{ ??????Addr:?????infSli[0], ??????Username:?infSli[1], ??????Password:?infSli[2], ???}) ???if?err?!=?nil?{ ??????TypeMonitorChan?<-?TypeErrNum ??????log.Fatal(err) ???} ???defer?c.Close() ???//?Create?a?new?point?batch ???bp,?err?:=?client.NewBatchPoints(client.BatchPointsConfig{ ??????Database:??infSli[3], ??????Precision:?infSli[4], ???}) ???if?err?!=?nil?{ ??????TypeMonitorChan?<-?TypeErrNum ??????log.Fatal(err) ???} ???for?v?:=?range?wc?{ ??????//?Create?a?point?and?add?to?batch ??????tags?:=?map[string]string{"Path":?v.Path,?"Method":?v.Method,?"Scheme":?v.Scheme,?"Status":?v.Status} ??????fields?:=?map[string]interface{}{ ?????????"UpstreamTime":?v.UpstreamTime, ?????????"RequestTime":?v.RequestTime, ?????????"BytesSent":?v.BytesSent, ??????} ??????pt,?err?:=?client.NewPoint("nginx_log",?tags,?fields,?v.TimeLocal) ??????if?err?!=?nil?{ ?????????log.Fatal(err) ??????} ??????bp.AddPoint(pt) ??????//?Write?the?batch ??????if?err?:=?c.Write(bp);?err?!=?nil?{ ?????????TypeMonitorChan?<-?TypeErrNum ?????????log.Fatal(err) ??????} ??????//?Close?client?resources ??????if?err?:=?c.Close();?err?!=?nil?{ ?????????TypeMonitorChan?<-?TypeErrNum ?????????log.Fatal(err) ??????} ??????log.Println("Write?success") ???} } func?(l?*LogProcess)?Process()?{ ???//log?parsing?module ???r?:=?regexp.MustCompile(`([\d\.]+)\s+([^?\[]+)\s+([^?\[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`) ???loc,?_?:=?time.LoadLocation("Asia/Shanghai") ???for?v?:=?range?l.rc?{ ??????ret?:=?r.FindStringSubmatch(string(v)) ??????if?len(ret)?!=?13?{ ?????????TypeMonitorChan?<-?TypeErrNum ?????????log.Println("FindStringSubMatch?fail:",?string(v)) ?????????continue ??????} ??????message?:=?&Message{} ??????t,?err?:=?time.ParseInLocation("02/Jan/2006:15:04:05?+0800",?ret[4],?loc) ??????if?err?!=?nil?{ ?????????TypeMonitorChan?<-?TypeErrNum ?????????log.Println("ParseInLocation?fail",?err.Error(),?ret[4]) ?????????continue ??????} ??????message.TimeLocal?=?t ??????byteSent,?_?:=?strconv.Atoi(ret[7]) ??????message.BytesSent?=?byteSent ??????reqSli?:=?strings.Split(ret[5],?"?") ??????if?len(reqSli)?!=?3?{ ?????????TypeMonitorChan?<-?TypeErrNum ?????????log.Println("strings.Split?fail",?ret[5]) ?????????continue ??????} ??????message.Method?=?reqSli[0] ??????u,?err?:=?url.Parse(reqSli[1]) ??????if?err?!=?nil?{ ?????????log.Println("url?parse?fail",?err) ?????????continue ??????} ??????message.Path?=?u.Path ??????//message.Scheme?=?ret[5] ??????message.Status?=?ret[6] ??????upstreamTime,?_?:=?strconv.ParseFloat(ret[11],?64) ??????requestTime,?_?:=?strconv.ParseFloat(ret[12],?64) ??????message.UpstreamTime?=?upstreamTime ??????message.RequestTime?=?requestTime ??????l.wc?<-?message ???} ???//127.0.0.1?-?-?[14/May/2018:14:52:45?+0800]?"GET?/foo?query=t?HTTP/1.1"?404?27?"-"?"KeepAliveClient"?"-"?1.005?1.854 ???//([\d\.]+)\s+([^?\[]+)\s+([^?\[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+) } func?main()?{ ???var?path,?influxDsn?string ???flag.StringVar(&path,?"path",?"/usr/local/var/log/nginx/sd-mac-access-8081.log",?"read?file?path") ???flag.StringVar(&influxDsn,?"influxDsn",?"http://127.0.0.1:8086@wangxu@wangxu26@imooc@s",?"influx?data?source") ???flag.Parse() ???r?:=?&ReadFromFile{ ??????path:path, ???} ???w?:=?&WriteToInfluxDB{ ??????influxDBDsn:influxDsn, ???} ???lp?:=?&LogProcess{ ??????//使用帶緩存的channel,防止阻塞 ??????rc:make(chan?[]byte,?200), ??????wc:make(chan?*Message,?200), ??????read:r, ??????write:w, ???} ???//開兩個讀goroutines ???for?i?:=?0;?i?<?2;?i++?{ ??????go?lp.read.Read(lp.rc) ???} ???go?lp.Process() ???//讀比寫要快,多開幾個寫的goroutines ???for?i?:=?0;?i?<?4;?i++?{ ??????go?lp.write.Write(lp.wc) ???} ???//time.Sleep(30*time.Second) ???m?:=?Monitor{ ??????startTime:time.Now(), ??????data:SystemInfo{}, ???} ???m.start(lp) }
查看全部 -
常見的并發模型
1,進程和線程模型,例如apache
2,異步非阻塞,例如nginx,nodejs
3,協程,例如golang,erlang,lua
查看全部 -
golang分析nginx日志
使用influxdb存儲
使用grafana展示
查看全部 -
任務的合理的拆分
查看全部 -
Influxdb關鍵概念
查看全部 -
Influxdb簡介
查看全部 -
日志監控系統——寫入模塊
查看全部 -
日志監控系統——解析模塊
查看全部 -
日志監控系統——讀取模塊的實現
查看全部 -
golang中的面向對象
查看全部 -
并發與并行
查看全部 -
golang并發實現——select
查看全部 -
gplang并發實現——channels
查看全部
舉報
0/150
提交
取消