Re: Read/Write streams in parallel
- To: mathgroup at smc.vnet.net
- Subject: [mg109862] Re: Read/Write streams in parallel
- From: Zach Bjornson <bjornson at mit.edu>
- Date: Thu, 20 May 2010 06:38:28 -0400 (EDT)
Yes, there is a way. I can't take credit for this though, I had asked Wolfram the same question when they first released the Parallel commands. (Odd formatting below. > are input lines. Others are output.) > LaunchKernels[] {KernelObject[1, "local"], KernelObject[2, "local"], KernelObject[3, "local"], KernelObject[4, "local"], KernelObject[5, "local"], KernelObject[6, "local"], KernelObject[7, "local"], KernelObject[8, "local"]} > offset = IntegerPart[FileByteCount["largetest"]/Length[Kernels[]]] 1686 ParallelEvaluate[in = OpenRead["largetest"]] {InputStream["largetest", 10], InputStream["largetest", 10], InputStream["largetest", 10], InputStream["largetest", 10], InputStream["largetest", 10], InputStream["largetest", 10], InputStream["largetest", 10], InputStream["largetest", 10]} DistributeDefinitions[offset] Table[With[{i = i}, ParallelSubmit[SetStreamPosition[in, offset*i]]], {i, 0, Length[Kernels[]]}]; WaitAll[%] {0, 1686, 3372, 5058, 6744, 8430, 10116, 11802, 13488} ParallelEvaluate[Read[in, Table[Character, {offset}]]][[1 ;; 3, 1 ;; 20]] {{"y", "a", "4", "4", "b", " ", "h", "c", "g", "f", "c", "3", "g", "c", "4", "c", "y", "1", "h", "4"}, {"3", "y", "y", "3", "1", "3", "e", "g", "3", "2", "3", "h", "q", "b", "2", "e", "a", "b", "1", "a"}, {"g", "e", "q", "e", "3", "d", "4", "h", " ", "3", "f", "1", "f", "c", "3", "3", "3", "z", "4", "q"}} ParallelEvaluate[Close[in]] {"largetest", "largetest", "largetest", "largetest", "largetest", \ "largetest", "largetest", "largetest"} On 5/19/2010 8:13 PM, ChrisL wrote: > For a future project, I'll need to read and write some streams in > parallel. I've started playing with a simple example and can't make it > work. > What I want to do is to read an entry from a stream ('testfile') and > write its value to another stream ('testfile2'). This is > straightforward with one processor but in parallel, I get either a > wrong result or errors messages. > For example (testfile contains the first 100 prime numbers): > ParallelEvaluate[sr = OpenRead@FileNameJoin[{$TemporaryDirectory, > "testfile"}]] > ParallelEvaluate[wr = OpenAppend@FileNameJoin[{$TemporaryDirectory, > "testfile2"}]] > ParallelDo[ > x = Read[sr]; Print[{x, $KernelID}]; > Write[wr, {x, $KernelID}], {10}]; > ParallelEvaluate[Close[sr]] > ParallelEvaluate[Close[wr]] > > testfile2 then contains: > {2, 2} > {2, 1} > {3, 2} > {3, 1} > {5, 2} > {5, 1} > {7, 2} > {7, 1} > {11, 2} > {11, 1} > This is not satisfactory: the entries are read twice, once per > processor. Obviously, it's because the two streams are opened by both > processors in parallel. So we want the streams to be opened only once, > and their 'pointers' (location of last entry read) to be updated > concurrently. > I try this instead: > sr = OpenRead@FileNameJoin[{$TemporaryDirectory, "testfile"}] > wr = OpenAppend@FileNameJoin[{$TemporaryDirectory, "testfile2"}] > SetSharedVariable[sr, wr] > ParallelDo[x = Read[sr]; Print[{x, $KernelID}]; > Write[wr, {x, $KernelID}], {4}]; > Close[sr] > Close[wr] > > This time the two streams should be open only once and sync-ed thanks > to SetSharedVariable. But testfile2 ends up being empty*, and > Mathematica sends some error messages: > InputStream["/tmp/testfile", 33] (* Value of sr *) > OutputStream["/tmp/testfile2", 34] (* Value of wr *) > (kernel 2) Read::openx: InputStream[/tmp/testfile,33] is not open. (* > yes it is! *) > (kernel 1) Read::openx: InputStream[/tmp/testfile,33] is not open. (* > yes it is! *) > etc. > > Is there any way I can make it work? The correct content for testfile2 > should be the list of the 10 first prime numbers, possibly in a > different order, and the kernelID that wrote the entry. > > * if it starts from an empty file. Notice the OpenAppend (!=OpenWrite) > Thank you very much in advance, > Cheers, > Chris > >