2 回答

TA貢獻1900條經驗 獲得超5個贊
使用以下內容將字符串通道轉換為讀取器:
type chanReader struct {
c chan string
buf string
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
}
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
像這樣使用它:
r := csv.NewReader(&chanReader{c: feederChan})
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
如果應用程序假定換行符分隔從通道接收的值,則將換行符附加到每個接收到的值:
...
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.buf += "\n"
...
復制+= "\n"字符串。如果這不能滿足應用程序的效率要求,則引入一個新字段來管理行分隔符。
type chanReader struct {
c chan string // source of lines
buf string // the current line
nl bool // true if line separator is pending
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 && !r.nl {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.nl = true
}
// Return data if we have it
if len(r.buf) > 0 {
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// No data, return the line separator
n := copy(p, "\n")
r.nl = n == 0
return n, nil
}
另一種方法是按照問題評論中的建議,使用 io.Pipe 和 goroutine 將通道轉換為 io.Reader。這種方法的第一步是:
var nl = []byte("\n")
func createChanReader(c chan string) io.Reader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
io.WriteString(w, s)
w.Write(nl)
}
}
}()
return r
}
像這樣使用它:
r := csv.NewReader(createChanReader(feederChan))
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
當應用程序在將管道讀取到 EOF 之前退出循環時, io.Pipe 解決方案的第一遍會泄漏 goroutine。應用程序可能會提前中斷,因為 CSV 閱讀器檢測到語法錯誤,應用程序由于程序員錯誤或任何其他原因而崩潰。
要修復 goroutine 泄漏,請在寫入錯誤時退出寫入 goroutine,并在完成讀取后關閉管道讀取器。
var nl = []byte("\n")
func createChanReader(c chan string) *io.PipeReader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
if _, err := io.WriteString(w, s); err != nil {
return
}
if _, err := w.Write(nl); err != nil {
return
}
}
}()
return r
}
像這樣使用它:
cr := createChanReader(feederChan)
defer cr.Close() // Required for goroutine cleanup
r := csv.NewReader(cr)
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}

TA貢獻1825條經驗 獲得超4個贊
我最終還是使用了 io.Pipe() “正如 mh-cbon 提到的那樣”,它更簡單并且看起來更有效(如下所述):
rp, wp := io.Pipe()
go func() {
? ? defer wp.Close()
? ? for i := range feederChan {
? ? ? ? fmt.Fprintln(wp, i)
? ? }
}()
r := csv.NewReader(rp)
for { // keep reading
? ? a, err := r.Read()
? ? if err == io.EOF {
? ? ? ? break
? ? }
? ? // do stuff with 'a'
? ? // ...
}
io.Pipe() 是同步的,并且應該相當高效:它將數據從寫入器通過管道傳輸到讀取器;我將 csv.NewReader() 提供給讀者部分,并創建了一個 goroutine,將 chan 寫入到作者部分。
多謝。
- 2 回答
- 0 關注
- 138 瀏覽
添加回答
舉報