Thread: Parallel programming in C

  1. #1
    Registered User
    Join Date
    Feb 2013
    Posts
    45

    Parallel programming in C

    Hi
    I have one code that use MPI broadcast and I want to change it into Asynchronous Point to Point communication. I am newbie in Parallel programming.
    Can some one give me example implementation of one simple same program in broadcast and P2P ?

  2. #2
    Registered User
    Join Date
    May 2012
    Posts
    505
    Quote Originally Posted by gevni View Post
    Hi
    I have one code that use MPI broadcast and I want to change it into Asynchronous Point to Point communication. I am newbie in Parallel programming.
    Can some one give me example implementation of one simple same program in broadcast and P2P ?
    I don't have an MPI installion with me, so it's hard to give an example.

    MPI boradcast sends one message from a processor, usually index zero, to every processor in the group. MPI SendMessage sends a message from one processor to another.
    The answer to the question depends where the information resides. Probably you've done a lot of calculations in parallel, given them back to the root, then the root is doling out the results to everyone for further processing. If you want to move to peer-to-peer communication, you need to be able to work out which bits of that result the subsidiary processors actually need. There's no general answer to that.
    You can rewrite MPI_Broadcast in terms of MPI_SendMessage, but that gains you nothing.
    I'm the author of MiniBasic: How to write a script interpreter and Basic Algorithms
    Visit my website for lots of associated C programming resources.
    https://github.com/MalcolmMcLean


  3. #3
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Instead of broadcasting, you allocate an array of MPI_Request (and MPI_Status) structures, one for each simultaneous asynchronous operation. You post the operations using the MPI_I...() variants, and continue work as usual.

    Note: You must not modify or discard the data, until the transfers have completed!

    When you need the results of the transfers (or just wish to wait for the transfers to complete), use MPI_Waitall() on the arrays, or MPI_Wait() on a specific transfer -- each transfer being associated with an MPI_Request structure, and transfer status with an MPI_Status structure.

    That's about it. Here's the example:
    Code:
    #include <stdlib.h>
    #include <stdio.h>
    #include <sys/time.h>
    #include <time.h>
    #include <mpi.h>
    
    /* Tags are used to distinguish simultaneous transfers between a pair of processes.
     * This example uses two async sends and one async receive between each
     * non-root process and root process.
    */
    #define TAG_SEND1 1
    #define TAG_SEND2 2
    #define TAG_RECV  3
    
    #define FATAL(message...) do { \
                                  fflush(stdout); \
                                  fprintf(stderr, message); \
                                  fflush(stderr); \
                                  MPI_Abort(MPI_COMM_WORLD, 1); \
                                  return 1; \
                              } while (0)
    
    char *now(void)
    {
        static char    buffer[128];
        struct timeval tv;
        struct tm     *gm;
    
        gettimeofday(&tv, NULL);
        gm = gmtime(&tv.tv_sec);
    
        sprintf(buffer, "%d:%d:%d.%06d UTC", gm->tm_hour, gm->tm_min, gm->tm_sec, (int)tv.tv_usec);
        return buffer;
    }
    
    #define PRINT(stuff...) do { \
                                struct timeval tv; \
                                struct tm     *gm; \
                                gettimeofday(&tv, NULL); \
                                gm = gmtime(&tv.tv_sec); \
                                fprintf(stdout, "%02d:%02d:%02d.%06d UTC on process %d: ", gm->tm_hour, gm->tm_min, gm->tm_sec, (int)tv.tv_usec, process); \
                                fprintf(stdout, stuff); \
                                fflush(stdout); \
                            } while (0)
    
    int main(int argc, char *argv[])
    {
        int  process, processes, n;
    
        if (MPI_Init(&argc, &argv)) {
            fprintf(stderr, "No MPI.\n\n");
            return 1;
        }
    
        if (MPI_Comm_size(MPI_COMM_WORLD, &processes) ||
            MPI_Comm_rank(MPI_COMM_WORLD, &process) ||
            process < 0 || process >= processes)
            FATAL("Invalid MPI process information.\n");
    
        if (process == 0) {
            MPI_Request *req;
            MPI_Status  *rok;
            int         *sent1, *sent2, *recvd;
    
            req = malloc(3 * (processes-1) * sizeof *req);
            rok = malloc(3 * (processes-1) * sizeof *rok);
            sent1 = malloc(processes * sizeof *sent1);
            sent2 = malloc(processes * sizeof *sent2);
            recvd = malloc(processes * sizeof *recvd);
            if (!req || !rok || !sent1 || !sent2 || !recvd)
                FATAL("Out of memory.\n");
    
            /* Fill sent1 with process numbers */
            for (n = 1; n < processes; n++)
                sent1[n] = n;
    
            /* Fill sent2 with 1 + (1 for odd processes) */
            for (n = 1; n < processes; n++)
                sent2[n] = 1 + (n % 2);
    
            /* Note: we fill req array completely, without leaving holes.
             * for sent1, sent2, recvd we left [0] unassigned for simplicity. */
    
            PRINT("Ready to send.\n");
    
            /* Start sending sent1 entries, */
            for (n = 1; n < processes; n++)
                if (MPI_Isend(&(sent1[n]), 1, MPI_INTEGER, n, TAG_SEND1, MPI_COMM_WORLD, &(req[n - 1])))
                    FATAL("Error initiating async send (1) from process 0 to process %d.\n", n);
    
            PRINT("First batch sent.\n");
    
            /* sending sent2 entries, */
            for (n = 1; n < processes; n++)
                if (MPI_Isend(&(sent2[n]), 1, MPI_INTEGER, n, TAG_SEND2, MPI_COMM_WORLD, &(req[n + processes - 2])))
                    FATAL("Error initiating async send (2) from process 0 to process %d.\n", n);
    
            PRINT("Second batch sent.\n");
    
            /* and receiving the recvd entries. */
            for (n = 1; n < processes; n++)
                if (MPI_Irecv(&(recvd[n]), 1, MPI_INTEGER, n, TAG_RECV, MPI_COMM_WORLD, &(req[n + 2*processes - 3])))
                    FATAL("Error initiating async receive from process %d to process 0.\n", n);
    
            /* The sends and receives will commence on the background.
             * The processes can do the receives and sends in any order they like, too. */
            PRINT("Waiting for communications to complete.\n");
    
            /* Because we packed all 3*processes-3 (= 3*(processes-1)) requests into one array consecutively,
             * we can now use MPI_Waitall() to wait for all of them to complete. */
            if (MPI_Waitall(3*(processes-1), req, rok))
                FATAL("Error on process 0 when waiting for transfers to complete.\n");
    
            /* Verify all transfers were successful. */
            for (n = 0; n < 3*(processes-1); n++)
                if (rok[n].MPI_ERROR)
                    FATAL("Async I/O failed on process 0.\n");
    
            /* Output the per-process results. */
            for (n = 1; n < processes; n++)
                PRINT("Result from process %d: OP(%d, %d) = %d\n", n, sent1[n], sent2[n], recvd[n]);
    
            /* Done. */
            free(req);
            free(rok);
            free(sent1);
            free(sent2);
            free(recvd);
    
        } else {
            MPI_Request recvreq[2];
            MPI_Status  recvok[2];
            int         recv1, recv2, sent;
    
            PRINT("Ready to receive.\n");
    
            /* Non-root processes do two asynchronous receives. */
            if (MPI_Irecv(&recv1, 1, MPI_INTEGER, 0, TAG_SEND1, MPI_COMM_WORLD, &(recvreq[0])))
                FATAL("Error on process %d trying to initiate async receive (1) from process 0.\n", process);
            if (MPI_Irecv(&recv2, 1, MPI_INTEGER, 0, TAG_SEND2, MPI_COMM_WORLD, &(recvreq[1])))
                FATAL("Error on process %d trying to initiate async receive (2) from process 0.\n", process);
    
            PRINT("Preparing.\n");
    
            /* At this point, we need recv1 and recv2 for calculations,
             * so we must wait for both to complete. */
            if (MPI_Waitall(2, recvreq, recvok))
                FATAL("Error on process %d when waiting for receives to complete.\n", process);
    
            /* Verify transfers succeeded. */
            for (n = 0; n < 2; n++)
                if (recvok[n].MPI_ERROR)
                    FATAL("Error on process %d: async receive %d failed.\n", process, n + 1);
    
            /* Just do a frigging multiplication. */
            sent = recv1 * recv2;
    
            PRINT("Received %d and %d, responding with %d.\n", recv1, recv2, sent);
    
            /* Async-send the response. */
            if (MPI_Isend(&sent, 1, MPI_INTEGER, 0, TAG_RECV, MPI_COMM_WORLD, &recvreq[0]))
                FATAL("Error on process %d trying to async send response to process 0.\n", process);
    
            /* This process is now ready to do cleanup work. */
            PRINT("Cleanup.\n");
    
            /* Wait for the async response to complete. */
            if (MPI_Wait(&recvreq[0], &recvok[0]))
                FATAL("Error on process %d when waiting for send to complete.\n", process);
            if (recvok[0].MPI_ERROR)
                FATAL("Error on process %d: async send failed.\n", process);
        }
    
        /* Wait for everyone to get this far, before finalizing. */
        if (MPI_Barrier(MPI_COMM_WORLD))
            FATAL("Barrier failed on process %d.\n", process); 
    
        if (MPI_Finalize()) {
            fprintf(stderr, "Error finalizing MPI.\n");
            return 1;
        }
    
        return 0;
    }
    If you are running Linux, save the above as e.g. example.c, then compile using e.g.
    Code:
    mpicc -W -Wall example.c -o example
    and then run it; if locally, then e.g.
    Code:
    mpirun -np 9 ./example
    Note that I added some ugly macros to illustrate proper error handling, and to include wall-clock timestamps in the output. (The timestamps are not that useful on clusters, since the clocks aren't that tightly coupled.)
    On my four-way AMD Athlon II X4 640, running
    Code:
    mpirun -np 9 ./example | sort
    sorts the output by the local wall clock, and is typically something like
    Code:
    16:35:07.701212 UTC on process 0: Ready to send.
    16:35:07.701232 UTC on process 2: Ready to receive.
    16:35:07.701268 UTC on process 1: Ready to receive.
    16:35:07.701329 UTC on process 2: Preparing.
    16:35:07.701370 UTC on process 1: Preparing.
    16:35:07.701407 UTC on process 0: First batch sent.
    16:35:07.701420 UTC on process 0: Second batch sent.
    16:35:07.701422 UTC on process 1: Received 1 and 2, responding with 2.
    16:35:07.701464 UTC on process 1: Cleanup.
    16:35:07.701470 UTC on process 0: Waiting for communications to complete.
    16:35:07.701528 UTC on process 3: Ready to receive.
    16:35:07.701642 UTC on process 3: Preparing.
    16:35:07.702271 UTC on process 8: Ready to receive.
    16:35:07.702351 UTC on process 2: Received 2 and 1, responding with 2.
    16:35:07.702385 UTC on process 2: Cleanup.
    16:35:07.702455 UTC on process 8: Preparing.
    16:35:07.702525 UTC on process 8: Received 8 and 1, responding with 8.
    16:35:07.702550 UTC on process 8: Cleanup.
    16:35:07.703496 UTC on process 4: Ready to receive.
    16:35:07.703993 UTC on process 5: Ready to receive.
    16:35:07.704135 UTC on process 4: Preparing.
    16:35:07.704188 UTC on process 5: Preparing.
    16:35:07.704260 UTC on process 3: Received 3 and 2, responding with 6.
    16:35:07.704317 UTC on process 3: Cleanup.
    16:35:07.704418 UTC on process 5: Received 5 and 2, responding with 10.
    16:35:07.704456 UTC on process 4: Received 4 and 1, responding with 4.
    16:35:07.704498 UTC on process 5: Cleanup.
    16:35:07.704539 UTC on process 4: Cleanup.
    16:35:07.710181 UTC on process 7: Ready to receive.
    16:35:07.711159 UTC on process 6: Ready to receive.
    16:35:07.714085 UTC on process 6: Preparing.
    16:35:07.714286 UTC on process 7: Preparing.
    16:35:07.714340 UTC on process 6: Received 6 and 1, responding with 6.
    16:35:07.714412 UTC on process 6: Cleanup.
    16:35:07.714602 UTC on process 7: Received 7 and 2, responding with 14.
    16:35:07.714672 UTC on process 7: Cleanup.
    16:35:07.714677 UTC on process 0: Result from process 1: OP(1, 2) = 2
    16:35:07.714684 UTC on process 0: Result from process 2: OP(2, 1) = 2
    16:35:07.714689 UTC on process 0: Result from process 3: OP(3, 2) = 6
    16:35:07.714693 UTC on process 0: Result from process 4: OP(4, 1) = 4
    16:35:07.714697 UTC on process 0: Result from process 5: OP(5, 2) = 10
    16:35:07.714701 UTC on process 0: Result from process 6: OP(6, 1) = 6
    16:35:07.714705 UTC on process 0: Result from process 7: OP(7, 2) = 14
    16:35:07.714710 UTC on process 0: Result from process 8: OP(8, 1) = 8
    Unless you sort the output, the output is often not in wall clock order. This is because MPI transfers the standard output/error streams to the root node, and that causes a latency that often changes the output order.

    Hope you find this useful. Any questions?
    Last edited by Nominal Animal; 06-26-2013 at 10:36 AM. Reason: Correct output. 'node' was wrong, it's 'process'.

  4. #4
    Registered User MutantJohn's Avatar
    Join Date
    Feb 2013
    Posts
    2,665
    Oh God, MPI.

    Why can't we just use pthreads???

    I get that MPI is really, really scalable and that's awesome but can't we just allocate a dynamic array of pthread pointers or w/e? Like, can't we just go pthread[user_input_number_here]?

  5. #5
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by MutantJohn View Post
    Why can't we just use pthreads???
    1. most code is written by people who don't know how to avoid the unwanted cache effects when using threads
      (the kind of people who write a threaded version, and then claim that threads are slower because their threaded version is slower than their process-based one)
    2. distributing threaded processes (to multiple machines) is complicated; you need some kind of library to manage the communications anyway

    On x86 and x86-64, OpenMPI uses shared memory maps (and, I believe, an extra management thread or process), so local message passing ends up being just a bit of processing followed by a memory copy. The overhead is not that bad.

    I personally much prefer threads, but a lot of computational clusters are configured for process-based parallelism, not thread-based ones. So, if you want to run your parallel code distributed across a few nodes in a cluster, you usually have to use MPI.

    It's mostly a configuration issue. Most batch job queues don't by default tie multithreaded processes to specific CPUs, which leads to multithreaded processes using CPU resources not assigned to them. Because multithreaded is rare, most admins just sigh and forbid multithreaded distributed parallel processing, instead of fixing their job management (applications) and configurations. Missing features in queue managers are usually fixable via pre-job scripts -- definitely so in Linux. I can provide examples, if asked.

    Optimally, we'd just fix the existing queue manager applications instead. (It's not like adding CPU masks and adjusting CPU and I/O priorities of the batch jobs is that hard. All you have to do, is do it right.)

    I do suspect that given an extra I/O thread per node, and an IP-based (UDP/IP, in particular) small library for internode communications, should beat MPI on GbE- and 10GbE-based networks when nodes have similar architectures. (One nifty feature of MPI is that it can do endianness conversion on the fly, if needed.). I don't currently have physical access to an Infiniband cluster so I could compare IPoIB to native IB, or to find out what kind of interfaces the library would need to have for the API to be portable between IB and IP networks. I'd love to do that, though.

    (One of the things I've done a lot of research on, is how to avoid communications latency impacts in various simulation models. That is, how to do the necessary communications so that they do not delay computation.)

  6. #6
    Registered User MutantJohn's Avatar
    Join Date
    Feb 2013
    Posts
    2,665
    Ooh, okay. I have a few questions then or at least, a couple of things stood out to me that I'd like to discuss.

    You mentioned unwanted cache effects from threads. What do you mean by this?

    You also mentioned process-based parallelism. As I would understand this or can at least guess, process-based means you create threads proportional to the amount of processes. So like, if we had a full quadtree, we could create threads to navigate each branch. So we'd have one thread that brings us the root then 4 to each child then 4 for every one of those so the thread mapping goes like 1 -> 4 -> 16 -> 64 -> ... -> 4^n.

    Or there's a basic CUDA example of a dot product between two vectors, each of 4 million something elements. In that one, a thread is created for each index and matching elements are multiplied. Basically, you have (a1, a2, a3) and (b1, b2, b3) and the dot product is a1*b1 + a2*b2 + a3*b3 and the logic is, each product can be done independently before the final sum is atomically added. It looks like this :

    Code:
    #include <stdio.h>
    #include <stdlib.h>
    #include <cuda.h>
    
    
    #define n (2048*2048)
    #define tpb 512
    
    
    //This function adds up every 512 elements of
    //segmented dot product and then adds those
    //together to get the final sum.
    
    
    __global__ void dot(int *a, int *b, int *c) {
    
    
       __shared__ int temp[tpb];
       int index = threadIdx.x + blockIdx.x*blockDim.x;
       temp[threadIdx.x] = a[index] * b[index];
    
    
       __syncthreads();
    
    
       if (0 == threadIdx.x) {
    
    
          int sum = 0;
          for (int i=0; i<tpb; i++) {
    
    
             sum += temp[i];
          }
    
    
          atomicAdd(c, sum);
       }
    }
    
    
    int main(void) {
    
    
       int *a, *b, *c;
       int *dev_a, *dev_b, *dev_c;
       int size = n*sizeof(*a);
    
    
       a = (int*)malloc(size);
       b = (int*)malloc(size);
       c = (int*)malloc(sizeof(*c));
    
    
       cudaMalloc((void**)&dev_a, size);
       cudaMalloc((void**)&dev_b, size);
       cudaMalloc((void**)&dev_c, sizeof(*c));
    
    
       for (int i=0; i<n; i++) {
    
    
          a[i] = i;
          b[i] = i + 1;
       }
    
    
       cudaMemcpy(dev_a, a, size, cudaMemcpyHostToDevice);
       cudaMemcpy(dev_b, b, size, cudaMemcpyHostToDevice);
    
    
       dot<<<n/tpb, tpb>>>(dev_a, dev_b, dev_c);
    
    
       cudaMemcpy(c, dev_c, sizeof(*c), cudaMemcpyDeviceToHost);
    
    
       printf("%d\n", c);
    
    
       free(a);
       free(b);
       free(c);
    
    
       cudaFree(dev_a);
       cudaFree(dev_b);
       cudaFree(dev_c);
    
    
      return 0;
    }
    Is this an example of process-based parallelism?

  7. #7
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by MutantJohn View Post
    You mentioned unwanted cache effects from threads. What do you mean by this?
    Assume you have a number of threads working on data, say using thread pools, and data bucket brigades, and everything is working fine.

    Whatever the work is, it typically involves some set of parameters -- say, limits or thresholds --, that each thread uses, often, in their work.

    Now, if the programmer does not generate a private copy of those parameters for each thread, the threads will access the same memory for the constants/parameters. If the threads are running on different cores (or CPUs), the cache lines containing the data will ping-pong between the cores/CPUs, causing a significant slowdown in the access. This is called cache-line ping-pong, and it is a common performance-reducing effect.

    Depending on the CPU architecture, it may be possible to avoid that by keeping the parameters in a read-only locked memory-mapped area. (If it does not help, then the architecture almost certainly suffers from cache-line ping pong for text, i.e. binary code, too. And that's bad. Or your C library/kernel does not really allow you to mark memory-mapped pages *really* read-only.)

    Usually, the set of parameters is small enough so that a private copy is perfectly acceptable. When you have huge parameter sets each thread needs to access, you do need to do the mmap()/mremap()/mlock() dance, but few programmers I've ever met bother. (I do, obviously.)

    Quote Originally Posted by MutantJohn View Post
    You also mentioned process-based parallelism. As I would understand this or can at least guess, process-based means you create threads proportional to the amount of processes.
    No. Instead of threads, you create duplicates of the same process -- the executable program instance. You do this via fork(), or by executing the same binary multiple times, perhaps on different machines.

    If the processes are fork()ed, then the most common communications method is shared memory (via an anonymous memory map). Otherwise they use pipes, sockets, TCP/IP or UDP/IP, infiniband, or something else, to communicate with each other.

    In some cases, like OpenMPI, processes running on the same machine use shared memory, but IP or Infiniband between machines, for best speed. All this is completely transparent to the programmer, and that's what makes MPI powerful.

    A summary of the terms I use:
    • parallel: more than one at the same time
    • distributed: on more than one computer
    • thread-parallel: single process, but uses multiple threads
    • process-parallel: multiple processes, which talk to each other to coordinate their work
    • distributed thread-parallel: individual processes, each with multiple threads, running on a number of computers.
      Typically, each computer has an extra thread which handles the communications with the other computers.
    • distributed process-parallel: individual processes on a number of machines; usually the processes are single-threaded, except that some message-passing interfaces use an extra thread or threads per process to handle the communications asynchronously


    If you consider CUDA or OpenCL, you use thread-based parallelization, and only that, when you use a single computer.

    You can use CUDA and/or OpenCL with MPI, so if you have multiple machines with suitable cards, you're using CUDA/OpenCL with many threads in multiple processes. (Each process just runs on a different computer.)

    If you managed to install both AMD and Nvidia cards successfully on the same computer, I guess you could also do process-parallel OpenCL, or even mixed CUDA-OpenCL. Due to the library linkage, each process would have to be tied to one or more cards, all from the same manufacturer, but the processes could communicate with each other (via shared memory, sockets, MPI...), to coordinate their work and pass the data, and effectively work on the same data in parallel.

    Did this help? Any more questions?

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Parallel programming library C
    By Jalo in forum C Programming
    Replies: 2
    Last Post: 11-02-2012, 05:03 PM
  2. Parallel programming book
    By Jalo in forum Programming Book and Product Reviews
    Replies: 1
    Last Post: 07-10-2012, 07:25 AM
  3. How difficult is parallel programming in C?
    By darsunt in forum C Programming
    Replies: 20
    Last Post: 07-16-2009, 01:42 AM
  4. Usb Parallel port programming
    By neocruze in forum Linux Programming
    Replies: 0
    Last Post: 03-26-2008, 11:41 AM
  5. Parallel Programming PVM , any info ?
    By GSalah in forum Linux Programming
    Replies: 1
    Last Post: 11-24-2006, 05:23 PM