如何从goroutine的通道中持续接收数据

5

我是Golang的初学者。我做了一个关于Go通道的练习。我在主goroutine中打开并读取文件数据,然后通过通道将数据传递给第二个goroutine以保存到另一个文件。 我的代码如下:

  func main() {
   f, err := os.OpenFile("test.go", os.O_RDONLY, 0600)
   ch := make(chan []byte)
   buf := make([]byte, 10)
   bytes_len, err := f.Read(buf)
   fmt.Println("ReadLen:", bytes_len)
   if err != nil {
      fmt.Println("Error: ", err)
      return
   }
   go WriteFile(ch)
   for {
      ch<-buf
      bytes_len, err = f.Read(buf)
      if err != nil {
          fmt.Println("error=", err)
          break
      }
      if bytes_len < 10 {
          ch<-buf[:bytes_len]
          fmt.Println("Finished!")
          break
      }
   }
   time.Sleep(1e9)
   f.Close()
 }

  func WriteFile(ch <-chan []byte) {
    fmt.Println("* begin!")
    f, err := os.OpenFile("/home/GoProgram/test/test.file",  os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
    if err != nil {
       fmt.Println("* Error:", err)
       return
    }
    /* Method1:  use the "select" will write to target file OK, but it is too slow!!!
    for {
      select {
         case bytes, ok:= <-ch:
            if ok {
              f.Write(bytes)
            } else {
              fmt.Println("* file closed!")
              break
            }
         default:
            fmt.Println("* waiting data!")
      }
    } \*/
    // Method 2: use "for {if}", this will get messed text in target file, not identical with the source file.
    for {
      if bytes, ok := <-ch; ok {
            f.Write(bytes)
            fmt.Println("* buff=", string(bytes))
            bytes = nil
            ok = false
      } else {
        fmt.Println("** End ", string(bytes), "  ", ok)
        break
      }
    }

    /* Method 3: use "for range", this will get messed text like in method2
    for data:= range ch {
         f.Write(data)
       //fmt.Println("* Data:", string(data))
    }
    \*/
    f.Close()
}

我的问题是为什么Method2和Method3在目标文件中会出现混乱的文本?我该如何解决?
2个回答

4

由于读取器和写入器共享的缓冲区存在竞争,导致Method2和Method3出现混乱文本。

以下是程序可能的语句执行顺序:

 R: bytes_len, err = f.Read(buf)  
 R: ch<-buf[:bytes_len]
 W: bytes, ok := <-ch; ok
 R: bytes_len, err = f.Read(buf)  // this writes over buffer
 W: f.Write(bytes)                // writes data from second read

使用竞态检测器运行你的程序。它会为你标记出问题。

解决此问题的一种方法是复制数据。例如,从读取的字节创建字符串并将其发送到通道。

另一个选择是使用io.Pipe连接 goroutines。一个 goroutine 从源读取并写入管道。另一个 goroutine 从管道读取并写入目标。 管道会处理同步问题。


是的,Slice是一种引用类型,因此读写会产生竞争。当使用值类型如“string”时,结果是好的。 - Henry John
当我使用sync.Mutex来同步这两个goroutine中的读写操作时,程序总是会陷入死锁。 - Henry John

1
为了使用你在注释中提到的Method2Method3中的for循环代码段,你需要使用一个buffered通道。 func main中的循环没有与在WriteFile中监听通道的循环同步的机制,这就是为什么目标文件中的文本会混乱的原因。
另一方面,向缓冲通道发送只有在缓冲区满时才阻塞。接收则在缓冲区为空时阻塞。因此,通过初始化长度为一的通道缓冲区,您可以使用Method1和/或Method2。唯一剩下的就是在完成后记得关闭通道。
func main() {
    f, _ := os.OpenFile("test.txt", os.O_RDONLY, 0600)
    defer f.Close()
    ch := make(chan []byte, 1) // use second argument to make to give buffer length 1
    buf := make([]byte, 10)
    go WriteFile(ch)
    for {
        ch <- buf
        byteLen, err := f.Read(buf)
        if err != nil {
            break
        }
        if byteLen < 10 {
            ch <- buf[:byteLen]
            break
        }
    }
    close(ch) //close the channel when you done
}

func WriteFile(ch <-chan []byte) {
    f, err := os.OpenFile("othertest.txt", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
    defer f.Close()
    if err != nil {
        fmt.Println("* Error:", err)
        return
    }

    //Method 3: use "for range"
    for data := range ch {
        f.Write(data)
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接