Re: Read/Write streams in parallel
Re: Read/Write streams in parallel
Zach Bjornson
Thu, 20 May 2010 06:38:28 -0400 (EDT)
Yes, there is a way. I can't take credit for this though, I had asked
Wolfram the same question when they first released the Parallel
commands. (Odd formatting below. > are input lines. Others are output.)
> LaunchKernels[]
{KernelObject[1, "local"], KernelObject[2, "local"],
KernelObject[3, "local"], KernelObject[4, "local"],
KernelObject[5, "local"], KernelObject[6, "local"],
KernelObject[7, "local"], KernelObject[8, "local"]}
> offset = IntegerPart[FileByteCount["largetest"]/Length[Kernels[]]]
1686
ParallelEvaluate[in = OpenRead["largetest"]]
{InputStream["largetest", 10], InputStream["largetest", 10],
InputStream["largetest", 10], InputStream["largetest", 10],
InputStream["largetest", 10], InputStream["largetest", 10],
InputStream["largetest", 10], InputStream["largetest", 10]}
DistributeDefinitions[offset]
Table[With[{i = i},
ParallelSubmit[SetStreamPosition[in, offset*i]]], {i, 0,
Length[Kernels[]]}];
WaitAll[%]
{0, 1686, 3372, 5058, 6744, 8430, 10116, 11802, 13488}
ParallelEvaluate[Read[in, Table[Character, {offset}]]][[1 ;; 3,
1 ;; 20]]
{{"y", "a", "4", "4", "b", " ", "h", "c", "g", "f", "c", "3", "g",
"c", "4", "c", "y", "1", "h", "4"}, {"3", "y", "y", "3", "1", "3",
"e", "g", "3", "2", "3", "h", "q", "b", "2", "e", "a", "b", "1",
"a"}, {"g", "e", "q", "e", "3", "d", "4", "h", " ", "3", "f", "1",
"f", "c", "3", "3", "3", "z", "4", "q"}}
ParallelEvaluate[Close[in]]
{"largetest", "largetest", "largetest", "largetest", "largetest", \
"largetest", "largetest", "largetest"}
On 5/19/2010 8:13 PM, ChrisL wrote:
> For a future project, I'll need to read and write some streams in
> parallel. I've started playing with a simple example and can't make it
> work.
> What I want to do is to read an entry from a stream ('testfile') and
> write its value to another stream ('testfile2'). This is
> straightforward with one processor but in parallel, I get either a
> wrong result or errors messages.
> For example (testfile contains the first 100 prime numbers):
> ParallelEvaluate[sr = OpenRead@FileNameJoin[{$TemporaryDirectory,
> "testfile"}]]
> ParallelEvaluate[wr = OpenAppend@FileNameJoin[{$TemporaryDirectory,
> "testfile2"}]]
> ParallelDo[
> x = Read[sr]; Print[{x, $KernelID}];
> Write[wr, {x, $KernelID}], {10}];
> ParallelEvaluate[Close[sr]]
> ParallelEvaluate[Close[wr]]
>
> testfile2 then contains:
> {2, 2}
> {2, 1}
> {3, 2}
> {3, 1}
> {5, 2}
> {5, 1}
> {7, 2}
> {7, 1}
> {11, 2}
> {11, 1}
> This is not satisfactory: the entries are read twice, once per
> processor. Obviously, it's because the two streams are opened by both
> processors in parallel. So we want the streams to be opened only once,
> and their 'pointers' (location of last entry read) to be updated
> concurrently.
> I try this instead:
> sr = OpenRead@FileNameJoin[{$TemporaryDirectory, "testfile"}]
> wr = OpenAppend@FileNameJoin[{$TemporaryDirectory, "testfile2"}]
> SetSharedVariable[sr, wr]
> ParallelDo[x = Read[sr]; Print[{x, $KernelID}];
> Write[wr, {x, $KernelID}], {4}];
> Close[sr]
> Close[wr]
>
> This time the two streams should be open only once and sync-ed thanks
> to SetSharedVariable. But testfile2 ends up being empty*, and
> Mathematica sends some error messages:
> InputStream["/tmp/testfile", 33] (* Value of sr *)
> OutputStream["/tmp/testfile2", 34] (* Value of wr *)
> (kernel 2) Read::openx: InputStream[/tmp/testfile,33] is not open. (*
> yes it is! *)
> (kernel 1) Read::openx: InputStream[/tmp/testfile,33] is not open. (*
> yes it is! *)
> etc.
>
> Is there any way I can make it work? The correct content for testfile2
> should be the list of the 10 first prime numbers, possibly in a
> different order, and the kernelID that wrote the entry.
>
> * if it starts from an empty file. Notice the OpenAppend (!=OpenWrite)
> Thank you very much in advance,
> Cheers,
> Chris
>
>
