package?main
import?(
???"fmt"
???"strings"
???"time"
???"os"
???"bufio"
???"io"
)
//?interface
type?Reader?interface?{
???Read(rc?chan?string)
}
type?Writer?interface?{
???Write(wc?chan?string)
}
type?LogProcess?struct?{
???rc??chan?string???//?in?file?get?message
???wc??chan?string???//?out?message?in?writer
????read?Reader
????write?Writer
}
func?(l?*LogProcess)?Process()??{
???for?v?:=?range?l.rc?{
??????l.wc?<-?strings.ToUpper(string(v))
???}
}
type?ReadFromFile?struct?{
???path??????????string????//?file?storage?path
}
/*
???1.?讀取模塊
??????a.?打開文件
??????b.?從文件?末尾?開始逐行讀取
?*/
func?(r?*ReadFromFile)?Read(rc?chan?string){
???//?line?:=?"string?in?message"
???file,?err?:=?os.Open(r.path)
???if?err?!=?nil?{
??????panic(fmt.Sprintf("open?file?error:?%s",?err.Error()))
???}
???defer?file.Close()
???//?從文件末尾開始逐行讀取
???file.Seek(0,?2)
???rd?:=?bufio.NewReader(file)
???for?{
??????//?why?is?not?use?method?rd.ReadString()
??????line,?err?:=?rd.ReadString('\n')
??????if?err?!=?nil?{
?????????if?err?==?io.EOF?{
????????????time.Sleep(500?*?time.Microsecond)
????????????continue
?????????}
?????????panic(fmt.Sprintf("ReadString?error:?%s",?err.Error()))
??????}
??????//rc?<-?line[:len(line)-1]
??????rc?<-?strings.TrimSpace(line)
???}
}
type?WriterToinfluxDB?struct?{
???influxDBDsn???string????//?influxDB?dsn
}
func?(w?*WriterToinfluxDB)?Write(wc?chan?string)?{
???for?v?:=?range?wc?{
??????fmt.Println(v)
???}
}
func?main()?{
???r?:=?&ReadFromFile{
??????path:?"/tmp/access.log",
???}
???w?:=?&WriterToinfluxDB{
??????influxDBDsn:?"username$password",
???}
???lp?:=?&LogProcess{
??????rc:?make(chan?string),
??????wc:?make(chan?string),
??????read:?r,
??????write:?w,
???}
???go?lp.read.Read(lp.rc)
???go?lp.Process()
???go?lp.write.Write(lp.wc)
???time.Sleep(?30?*?time.Second)
}
2020-01-25
嘎哈哈哈哈哈哈哈哈