Concurrencia en Java 6. Parte 1: Barriers

| | Comments (0) | TrackBacks (0)

Empiezo aquí lo que espero sea una larga serie de artículos sobre concurrency en Java 5 y 6. En los primeros artículos hablaré de mecanismos de sincronización.

Uno de los mecanismos de sincronización introducidos en Java 5 es la barrera (Barrier). Las barreras se usan para bloquear un grupo de threads hasta que todos ellos hayan completado su trabajo. Solo entonces se desbloquean todos los threads.

El JDK dispone de la clase CyclicBarrier, que es especialmente adecuada para el caso de algoritmos iterativos paralelizables que pueden descomponerse en un número discreto de subtareas.

Cuando un thread ha acabado su tarea, ha de bloquearse y esperar a que el resto de threads completen las suyas. Para ello, el thread llamará al método await de la clase CyclicBarrier. Cuando el último de los threads haya completado su trabajo, llamará a await, el objeto CyclicBarrier detectará que todos los threads han acabado y los desbloqueará a todos. En ese momento, el objeto CyclicBarrier se reinicializa para que pueda volver a ser utilizado de nuevo.

Cuando se crea el objeto de tipo CyclicBarrier, hay que decirle el número de threads que componen el grupo de threads. Además, es posible pasarle una instancia de la clase Runnable, que se ejecutará cuando todos los threads hayan invocado el método await, justo antes de que se desbloqueen todos los threads.

Por ejemplo, supongamos que tenemos un array de 1000 enteros, y que queremos obtener la suma de todos sus elementos. Asumimos también que disponemos de un sistema con, por ejemplo, 4 procesadores. El algoritmo es facilmente paralelizable: dividimos conceptualmente el array en 4 partes, y asignamos cada una de ellas a un thread. Al final, sumamos los cuatro valores devueltos por los 4 threads. Naturalmente, esto sería demasiado sencillo :), así que para complicarlo un poco introduciremos un requerimiento adicional: queremos que el programa nos dé feedback mientras está trabajando. Para ello, queremos que se impriman por pantalla las sumas parciales cada 100 números. Como tenemos 4 threads, cada thread sumará 25 elementos y esperará a que los otros threads hagan lo mismo. Entonces se sumarán los resultados de cada uno de los 4 threas y el ciclo volverá a empezar.

El código está dividido en tres clases. La clase Adder es la que hace el trabajo de recorrer el array e ir sumando los elementos. Adder implementa Runnable, y es ejecutada por un thread. Los threads se instancia en la clase Main. Por último, una instancia de la clase PartialPrinter es notificada por cada uno de los threads cada vez que se completa una suma parcial de los elementos del array. Además, es la clase que almacena las sumas parciales de los elementos del array.

package concurrency;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Adder implements Runnable
{

  private CyclicBarrier barrier;
  private int[] array;
  private int startPosition;
  private int endPosition;
  private int partial;
  private PartialPrinter partialPrinter;

  public Adder(CyclicBarrier barrier, int[] array, int startPosition,
          int endPosition, int partial, PartialPrinter partialPrinter)
  {
    this.barrier = barrier;
    this.array = array;
    this.startPosition = startPosition;
    this.endPosition = endPosition;
    this.partial = partial;
    this.partialPrinter = partialPrinter;
  }

  public void run()
  {
    int start = this.startPosition;
    int sum = 0;
    int howMany = 0;
    while (start <= this.endPosition) {
      sum += this.array[start];
      howMany++;
      if (howMany == partial) {
        partialPrinter.notifyPartial(sum);
        howMany = 0;
        sum=0;
        try {
          barrier.await();
        }
        catch (InterruptedException ex) {
          Logger.getLogger(Adder.class.getName()).log(Level.SEVERE, null, ex);
        }
        catch (BrokenBarrierException ex) {
          Logger.getLogger(Adder.class.getName()).log(Level.SEVERE, null, ex);
        }
      }
      start++;
    }
  }
}

package concurrency;

public class PartialPrinter implements Runnable{
  private int sumOfAll=0;
 
  public synchronized void notifyPartial(int sum)
  {
    this.sumOfAll+=sum;
  }
 
  public void run()
  {
    System.out.println("Sum so far: "+sumOfAll);
  }
}


package concurrency;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class Main
{

  private static int LONG_ARRAY = 1000;
  private static int THREADS = 4;
  private static int BLOCK_SIZE = LONG_ARRAY / THREADS;
  private static int PARTIAL_BLOCK = 25;

  public static void main(String[] args)
  {
    PartialPrinter partialPrinter = new PartialPrinter();
    CyclicBarrier barrier = new CyclicBarrier(4, partialPrinter);

    int[] array = initializaArray(LONG_ARRAY);
    Adder[] adder = new Adder[THREADS];
    for (int i = 0; i < THREADS; i++) {
      adder[i] = new Adder(barrier, array, i * BLOCK_SIZE, (i * BLOCK_SIZE) +
              BLOCK_SIZE - 1, PARTIAL_BLOCK, partialPrinter);
    }
    for (int i = 0; i < THREADS; i++) {
      new Thread(adder[i]).start();
    }
  }

  private static int[] initializaArray(int howMany)
  {
    int[] array = new int[howMany];
    Random random = new Random();
    for (int i = 0; i < howMany; i++) {
      array[i] = random.nextInt(10);
    }
    return array;
  }
}

0 TrackBacks

Listed below are links to blogs that reference this entry: Concurrencia en Java 6. Parte 1: Barriers.

TrackBack URL for this entry: http://www.juanmiguelgarcia.com/mt-tb.cgi/2

Leave a comment

About this Entry

This page contains a single entry by Juan Miguel published on June 5, 2008 7:34 PM.

Como descargar MP3s de manera legal y casi gratuita was the previous entry in this blog.

Instalando Ubuntu 8.04 en un disco duro externo is the next entry in this blog.

Find recent content on the main index or look in the archives to find all content.

Categories

Pages

Powered by Movable Type 4.1