package?main
import?(
???"strings"
???"fmt"
???"time"
???"os"
???"bufio"
???"io"
)
type?Reader3?interface?{
???read(rc?chan?[]byte)
}
type?Writer3?interface?{
???write(wc?chan?string)
}
type?ReadFromFile3?struct?{
???path?string
}
type?WriteToInfluxDB3?struct?{
???influxDBDsn?string
}
type?LogProcess3?struct?{
???rc?????chan?[]byte
???wc?????chan?string
???reader?Reader3
???writer?Writer3
}
//?1?讀取模塊
func?(r?*ReadFromFile3)?read(rc?chan?[]byte)?{
???fmt.Println("Begin?read?File")
???file,?e?:=?os.OpenFile(r.path,?os.O_WRONLY|os.O_APPEND,?os.ModePerm)
???if?e?!=?nil?{
??????panic(fmt.Sprintf("open?file?error:%s",?e.Error()))
???}
???//?從文件末尾開始逐行讀取文件內容
???file.Seek(0,?2)
???rd?:=?bufio.NewReader(file)
???for?{
??????line,?err?:=?rd.ReadBytes('\n')
??????if?err?!=?io.EOF?{?//?到結尾
?????????fmt.Println(line)
?????????time.Sleep(500?*?time.Millisecond)
?????????continue
??????}?else?if?err?!=?nil?{
?????????panic(fmt.Sprintf("ReadBytes?error:%s",?err.Error()))
??????}
??????fmt.Println(">?",line)
??????//?rc?<-?line
??????rc?<-?line[:len(line)-1]
???}
}
func?(l?*LogProcess3)?process()?{
???//?解析模塊
???
???for?v?:=?range?l.rc?{
??????l.wc?<-?strings.ToUpper(string(v))
???}
}
//?3?寫入模塊
func?(w?*WriteToInfluxDB3)?write(wc?chan?string)?{
???fmt.Println(">>?",*&wc)?//?>>??0xc4200760c0?這里為什么是地址呢?
???for?v?:=?range?wc?{
??????fmt.Printf(v)
???}
}
func?main()?{
???read?:=?&ReadFromFile3{
??????path:?"/Users/xyang/go_code/src/xyang.com/logcollect/data/access.log",
??????//path:?"data/access.log",
???}
???writer?:=?&WriteToInfluxDB3{
??????influxDBDsn:?"username=?&password=?",
???}
???lp?:=?&LogProcess3{
??????rc:?????make(chan?[]byte),
??????wc:?????make(chan?string),
??????reader:?read,
??????writer:?writer,
???}
???go?lp.reader.read(lp.rc)
???go?lp.process()
???go?lp.writer.write(lp.wc)
???time.Sleep(30?*?time.Second)
}
2019-05-13
改為:?