package?main
import?(
???"fmt"
???"strings"
???"time"
???"os"
???"bufio"
???"io"
???"regexp"
???"log"
???"strconv"
)
//?interface
type?Reader?interface?{
???Read(rc?chan?string)
}
type?Writer?interface?{
???Write(wc?chan?interface{})
}
type?LogProcess?struct?{
???rc??chan?string???//?in?file?get?message
???wc??chan?interface{}???//?out?message?in?writer
????read?Reader
????write?Writer
}
type?Message?struct?{
???IP?string
???Logtime?time.Time
???Url??string
???Code?int
???Length?float64
???Refer??string
???Client?string
}
/*
???1.?從?channel?中讀取每行日志數據
???2.?正則提取所需的數據
???3.?寫入到?writer?channel
?*/
func?(l?*LogProcess)?Process()??{
???/*
?????nginx?log?format
?????192.168.252.210?-?-?[03/Nov/2016:16:56:47?+0800]?"POST?/jsrpc.php?output=json-rpc?HTTP/1.1"?200?149?"http://ip:port/zabbix.php?action=dashboard.view"?"Mozilla/5.0?(Windows?NT?6.1;?WOW64)?AppleWebKit/537.36(KHTML,?like?Gecko)?Maxthon/4.9.3.1000?Chrome/39.0.2146.0?Safari/537.36"
?????grok?format
?????(?m)(?<ip>[\d+.]+)\s+(?<drop>[^\[]+)\s+\[(?<logtime>[^\]]+)\]\s+\"(?<url>[^"]+)\"\s+(?<code>\d+)\s+(?<length>\d+)\s+\"(?<refer>[^"]+)\"\s+\"(?<client>[^"]+)
???*/
???//?預格式化
???rex?:=?regexp.MustCompile(`([\d+.]+)\s+([^\[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d+)\s+(\d+)\s+\"([^"]+)\"\s+\"([^"]+)`)
???//?setting?time?zone
???loc,?_?:=?time.LoadLocation("Asia/Shanghai")
???for?v?:=?range?l.rc?{
??????//?利用正則分組的原則,將提取的字段分組
??????ret?:=?rex.FindStringSubmatch(v)
??????if?len(ret)?!=?9?{
?????????log.Println("match?is?error",?v)
?????????continue
??????}
??????//init?message?struct
??????message?:=?&Message{}
??????//?log?fromat?is??03/Nov/2016:16:56:47?+0800
??????//?RFC1123Z????=?"Mon,?02?Jan?2006?15:04:05?-0700"?//?RFC1123?with?numeric?zone
??????//?我在這里必須要使用這個時間戳,RFC1123Z?格式?不然出錯信息如下
??????//?2018/07/19?21:00:09?ParseInLocation?fail:?parsing?time?"03/Nov/2016:16:56:47?+0800"?as?"02/Jan/2006:15:04:05?+0000":?cannot?parse?"800"?as?"?+0000"?03/Nov/2016:16:56:47?+0800
??????//
??????t,?err?:=?time.ParseInLocation("02/Jan/2006:15:04:05?-0700",?ret[3],?loc)
??????if?err?!=?nil?{
?????????log.Println("ParseInLocation?fail:",?err.Error(),?ret[3])
??????}
??????message.Logtime?=?t
??????message.IP?=?ret[1]
??????//?這里偷懶了
??????message.Code,?_?=?strconv.Atoi(ret[5])
??????message.Length,?_?=?strconv.ParseFloat(ret[6],?64)
??????message.Client?=?ret[8]
??????message.Refer?=?ret[7]
??????message.Url?=?ret[4]
??????l.wc?<-?message
???}
}
type?ReadFromFile?struct?{
???path??????????string????//?file?storage?path
}
/*
???1.?讀取模塊
??????a.?打開文件
??????b.?從文件?末尾?開始逐行讀取
?*/
func?(r?*ReadFromFile)?Read(rc?chan?string){
???//?line?:=?"string?in?message"
???file,?err?:=?os.Open(r.path)
???if?err?!=?nil?{
??????panic(fmt.Sprintf("open?file?error:?%s",?err.Error()))
???}
???defer?file.Close()
???//?從文件末尾開始逐行讀取
???file.Seek(0,?2)
???rd?:=?bufio.NewReader(file)
???for?{
??????//?why?is?not?use?method?rd.ReadString()
??????line,?err?:=?rd.ReadString('\n')
??????if?err?!=?nil?{
?????????if?err?==?io.EOF?{
????????????time.Sleep(500?*?time.Microsecond)
????????????continue
?????????}
?????????panic(fmt.Sprintf("ReadString?error:?%s",?err.Error()))
??????}
??????//rc?<-?line[:len(line)-1]
??????rc?<-?strings.TrimSpace(line)
???}
}
type?WriterToinfluxDB?struct?{
???influxDBDsn???string????//?influxDB?dsn
}
func?(w?*WriterToinfluxDB)?Write(wc?chan?interface{})?{
???for?v?:=?range?wc?{
??????fmt.Println(v)
???}
}
func?main()?{
???r?:=?&ReadFromFile{
??????path:?"/tmp/access.log",
???}
???w?:=?&WriterToinfluxDB{
??????influxDBDsn:?"username$password",
???}
???lp?:=?&LogProcess{
??????rc:?make(chan?string),
??????wc:?make(chan?interface{}),
??????read:?r,
??????write:?w,
???}
???go?lp.read.Read(lp.rc)
???go?lp.Process()
???go?lp.write.Write(lp.wc)
???time.Sleep(?30?*?time.Second)
}
2021-09-19
這么多的代碼你是要干嘛呢