MathGroup Archive 2010

[Date Index] [Thread Index] [Author Index]

Search the Archive

Re: Read/Write streams in parallel

  • To: mathgroup at smc.vnet.net
  • Subject: [mg109862] Re: Read/Write streams in parallel
  • From: Zach Bjornson <bjornson at mit.edu>
  • Date: Thu, 20 May 2010 06:38:28 -0400 (EDT)

Yes, there is a way. I can't take credit for this though, I had asked 
Wolfram the same question when they first released the Parallel 
commands. (Odd formatting below. > are input lines. Others are output.)

 > LaunchKernels[]

{KernelObject[1, "local"], KernelObject[2, "local"],
  KernelObject[3, "local"], KernelObject[4, "local"],
  KernelObject[5, "local"], KernelObject[6, "local"],
  KernelObject[7, "local"], KernelObject[8, "local"]}

 > offset = IntegerPart[FileByteCount["largetest"]/Length[Kernels[]]]

1686

ParallelEvaluate[in = OpenRead["largetest"]]

{InputStream["largetest", 10], InputStream["largetest", 10],
  InputStream["largetest", 10], InputStream["largetest", 10],
  InputStream["largetest", 10], InputStream["largetest", 10],
  InputStream["largetest", 10], InputStream["largetest", 10]}

DistributeDefinitions[offset]

Table[With[{i = i},
    ParallelSubmit[SetStreamPosition[in, offset*i]]], {i, 0,
    Length[Kernels[]]}];

WaitAll[%]

{0, 1686, 3372, 5058, 6744, 8430, 10116, 11802, 13488}

ParallelEvaluate[Read[in, Table[Character, {offset}]]][[1 ;; 3,
  1 ;; 20]]

{{"y", "a", "4", "4", "b", " ", "h", "c", "g", "f", "c", "3", "g",
   "c", "4", "c", "y", "1", "h", "4"}, {"3", "y", "y", "3", "1", "3",
   "e", "g", "3", "2", "3", "h", "q", "b", "2", "e", "a", "b", "1",
   "a"}, {"g", "e", "q", "e", "3", "d", "4", "h", " ", "3", "f", "1",
   "f", "c", "3", "3", "3", "z", "4", "q"}}

ParallelEvaluate[Close[in]]

{"largetest", "largetest", "largetest", "largetest", "largetest", \
"largetest", "largetest", "largetest"}





On 5/19/2010 8:13 PM, ChrisL wrote:
> For a future project, I'll need to read and write some streams in
> parallel. I've started playing with a simple example and can't make it
> work.
> What I want to do is to read an entry from a stream ('testfile') and
> write its value to another stream ('testfile2'). This is
> straightforward with one processor but in parallel, I get either a
> wrong result or errors messages.
> For example (testfile contains the first 100 prime numbers):
> ParallelEvaluate[sr = OpenRead@FileNameJoin[{$TemporaryDirectory,
> "testfile"}]]
> ParallelEvaluate[wr = OpenAppend@FileNameJoin[{$TemporaryDirectory,
> "testfile2"}]]
> ParallelDo[
> 	x = Read[sr]; Print[{x, $KernelID}];
> 	Write[wr, {x, $KernelID}], {10}];
> ParallelEvaluate[Close[sr]]
> ParallelEvaluate[Close[wr]]
>
> testfile2 then contains:
> {2, 2}
> {2, 1}
> {3, 2}
> {3, 1}
> {5, 2}
> {5, 1}
> {7, 2}
> {7, 1}
> {11, 2}
> {11, 1}
> This is not satisfactory: the entries are read twice, once per
> processor. Obviously, it's because the two streams are opened by both
> processors in parallel. So we want the streams to be opened only once,
> and their 'pointers' (location of last entry read) to be updated
> concurrently.
> I try this instead:
> sr = OpenRead@FileNameJoin[{$TemporaryDirectory, "testfile"}]
> wr = OpenAppend@FileNameJoin[{$TemporaryDirectory, "testfile2"}]
> SetSharedVariable[sr, wr]
> ParallelDo[x = Read[sr]; Print[{x, $KernelID}];
>    Write[wr, {x, $KernelID}], {4}];
> Close[sr]
> Close[wr]
>
> This time the two streams should be open only once and sync-ed thanks
> to SetSharedVariable. But testfile2 ends up being empty*, and
> Mathematica sends some error messages:
> InputStream["/tmp/testfile", 33]  	(* Value of sr *)
> OutputStream["/tmp/testfile2", 34]  (* Value of wr *)
> (kernel 2) Read::openx: InputStream[/tmp/testfile,33] is not open. (*
> yes it is! *)
> (kernel 1) Read::openx: InputStream[/tmp/testfile,33] is not open. (*
> yes it is! *)
> etc.
>
> Is there any way I can make it work? The correct content for testfile2
> should be the list of the 10 first prime numbers, possibly in a
> different order, and the kernelID that wrote the entry.
>
> * if it starts from an empty file. Notice the OpenAppend (!=OpenWrite)
> Thank you very much in advance,
> Cheers,
> Chris
>
>    


  • Prev by Date: diagonal Ramsey number R(n,n)= 4k+2, R(5,5)=46
  • Next by Date: Fluid dynamic over a complex terrain
  • Previous by thread: Read/Write streams in parallel
  • Next by thread: Re: Read/Write streams in parallel