J'ai un en-tête et des données que je dois représenter dans un Byte Array. Et j'ai un format particulier pour emballer l'en-tête dans un Byte Array et aussi un format différent pour emballer les données dans un Byte Array. Après avoir ces deux, je dois en faire un Byte Array final.

Ci-dessous est la mise en page qui est définie comme dans C++ et en conséquence je dois faire dans Java.

// below is my header offsets layout

// addressedCenter must be the first byte
static constexpr uint32_t  addressedCenter      = 0;
static constexpr uint32_t  version              = addressedCenter + 1;
static constexpr uint32_t  numberOfRecords      = version + 1;
static constexpr uint32_t  bufferUsed           = numberOfRecords + sizeof(uint32_t);
static constexpr uint32_t  location             = bufferUsed + sizeof(uint32_t);
static constexpr uint32_t  locationFrom         = location + sizeof(CustomerAddress);
static constexpr uint32_t  locationOrigin       = locationFrom + sizeof(CustomerAddress);
static constexpr uint32_t  partition            = locationOrigin + sizeof(CustomerAddress);
static constexpr uint32_t  copy                 = partition + 1;

// this is the full size of the header
static constexpr uint32_t headerOffset = copy + 1;

Et CustomerAddress est un typedef pour uint64_t et il est composé comme ceci -

typedef uint64_t   CustomerAddress;

void client_data(uint8_t datacenter, 
                 uint16_t clientId, 
                 uint8_t dataId, 
                 uint32_t dataCounter,
                 CustomerAddress& customerAddress)
{
    customerAddress = (uint64_t(datacenter) << 56)
                    + (uint64_t(clientId) << 40)
                    + (uint64_t(dataId) << 32)
                    + dataCounter;
}

Et ci-dessous est ma disposition des données -

// below is my data layout -
//
// key type - 1 byte
// key len - 1 byte
// key (variable size = key_len)
// timestamp (sizeof uint64_t)
// data size (sizeof uint16_t)
// data (variable size = data size)

Énoncé du problème: -

Maintenant, pour une partie du projet, j'essaie de représenter des éléments globaux dans une classe particulière en Java afin que je puisse simplement passer les champs nécessaires et que cela puisse en faire un Byte Array final qui aura l'en-tête en premier puis les données:

Voici ma classe DataFrame:

public final class DataFrame {
  private final byte addressedCenter;
  private final byte version;
  private final Map<byte[], byte[]> keyDataHolder;
  private final long location;
  private final long locationFrom;
  private final long locationOrigin;
  private final byte partition;
  private final byte copy;

  public DataFrame(byte addressedCenter, byte version,
      Map<byte[], byte[]> keyDataHolder, long location, long locationFrom,
      long locationOrigin, byte partition, byte copy) {
    this.addressedCenter = addressedCenter;
    this.version = version;
    this.keyDataHolder = keyDataHolder;
    this.location = location;
    this.locationFrom = locationFrom;
    this.locationOrigin = locationOrigin;
    this.partition = partition;
    this.copy = copy;
  }

  public byte[] serialize() {
    // All of the data is embedded in a binary array with fixed maximum size 70000
    ByteBuffer byteBuffer = ByteBuffer.allocate(70000);
    byteBuffer.order(ByteOrder.BIG_ENDIAN);

    int numOfRecords = keyDataHolder.size();
    int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;

    // header layout
    byteBuffer.put(addressedCenter); // byte
    byteBuffer.put(version); // byte
    byteBuffer.putInt(numOfRecords); // int
    byteBuffer.putInt(bufferUsed); // int
    byteBuffer.putLong(location); // long
    byteBuffer.putLong(locationFrom); // long
    byteBuffer.putLong(locationOrigin); // long
    byteBuffer.put(partition); // byte
    byteBuffer.put(copy); // byte

    // now the data layout
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
      byte keyType = 0;
      byte keyLength = (byte) entry.getKey().length;
      byte[] key = entry.getKey();
      byte[] data = entry.getValue();
      short dataSize = (short) data.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(data);
      long timestamp = 0;

      if (dataSize > 10) {
        timestamp = dataBuffer.getLong(2);              
      }       

      byteBuffer.put(keyType);
      byteBuffer.put(keyLength);
      byteBuffer.put(key);
      byteBuffer.putLong(timestamp);
      byteBuffer.putShort(dataSize);
      byteBuffer.put(data);
    }
    return byteBuffer.array();
  }

  private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) {
    int size = 36;
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    }
    return size;
  }  
}

Voici comment j'utilise ma classe DataFrame ci-dessus:

  public static void main(String[] args) throws IOException {
    // header layout
    byte addressedCenter = 0;
    byte version = 1;

    long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120);
    long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130);
    long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140);

    byte partition = 3;
    byte copy = 0;

    // this map will have key as the actual key and value as the actual data, both in byte array
    // for now I am storing only two entries in this map
    Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>();
    for (int i = 1; i <= 2; i++) {
      keyDataHolder.put(generateKey(), getMyData());
    }

    DataFrame records =
        new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom,
            locationOrigin, partition, copy);

    // this will give me final packed byte array
    // which will have header and data in it.
    byte[] packedArray = records.serialize();
  }

  private static long packCustomerAddress(byte datacenter, short clientId, byte dataId,
      int dataCounter) {
    return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32)
        | ((long) dataCounter);
  }   

Comme vous pouvez le voir dans ma classe DataFrame, j'alloue ByteBuffer avec une taille prédéfinie de 70000. Existe-t-il un meilleur moyen d'allouer la taille que j'utilise lors de la création de ByteBuffer au lieu d'utiliser un 70000 codé en dur?

Existe-t-il également un meilleur moyen par rapport à ce que je fais qui regroupe mon en-tête et mes données dans un tableau d'octets? Je dois également m'assurer qu'il est thread-safe car il peut être appelé par plusieurs threads.

5
john 17 janv. 2017 à 02:37

2 réponses

Meilleure réponse

Existe-t-il un meilleur moyen d'allouer la taille que j'utilise lors de la création de ByteBuffer au lieu d'utiliser un 70000 codé en dur?

Il existe au moins deux approches qui ne se chevauchent pas. Vous pouvez utiliser les deux.

L'un est le pool de tampons. Vous devez savoir de combien de tampons vous avez besoin pendant les périodes de pointe et utiliser un maximum au-dessus, par exemple max + max / 2, max + moyen, max + mode, 2 * max.

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Function;

public class ByteBufferPool {
    private final int bufferCapacity;
    private final LinkedBlockingDeque<ByteBuffer> queue;

    public ByteBufferPool(int limit, int bufferCapacity) {
        if (limit < 0) throw new IllegalArgumentException("limit must not be negative.");
        if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");

        this.bufferCapacity = bufferCapacity;
        this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit);
    }

    public ByteBuffer acquire() {
        ByteBuffer buffer = (queue == null) ? null : queue.pollFirst();
        if (buffer == null) {
            buffer = ByteBuffer.allocate(bufferCapacity);
        }
        else {
            buffer.clear();
            buffer.order(ByteOrder.BIG_ENDIAN);
        }
        return buffer;
    }

    public boolean release(ByteBuffer buffer) {
        if (buffer == null) throw new IllegalArgumentException("buffer must not be null.");
        if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity.");
        if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct.");
        if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only.");

        return (queue == null) ? false : queue.offerFirst(buffer);
    }

    public void withBuffer(Consumer<ByteBuffer> action) {
        if (action == null) throw new IllegalArgumentException("action must not be null.");

        ByteBuffer buffer = acquire();
        try {
            action.accept(buffer);
        }
        finally {
            release(buffer);
        }
    }

    public <T> T withBuffer(Function<ByteBuffer, T> function) {
        if (function == null) throw new IllegalArgumentException("function must not be null.");

        ByteBuffer buffer = acquire();
        try {
            return function.apply(buffer);
        }
        finally {
            release(buffer);
        }
    }

    public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) {
        if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null.");

        ByteBuffer buffer = acquire();
        CompletionStage<T> future = null;
        try {
            future = asyncFunction.apply(buffer);
        }
        finally {
            if (future == null) {
                release(buffer);
            }
            else {
                future = future.whenComplete((result, throwable) -> release(buffer));
            }
        }
        return future;
    }
}

Les méthodes withBuffer permettent une utilisation directe du pool, tandis que les acquire et release permettent de séparer les points d'acquisition et de libération.

Un autre est la séparation de l'interface de sérialisation, par ex. les put, putInt et putLong, où vous pouvez ensuite implémenter une classe de comptage d'octets et une classe de mise en mémoire tampon d'octets réelle. Vous devez ajouter une méthode à une telle interface pour savoir si le sérialiseur compte des octets ou une mise en mémoire tampon, afin d'éviter la génération d'octets inutiles, et une autre méthode pour incrémenter l'utilisation des octets directement, utile lors du calcul de la taille d'une chaîne dans certains encodages sans sérialiser réellement .

public interface ByteSerializer {
    ByteSerializer put(byte value);

    ByteSerializer putInt(int value);

    ByteSerializer putLong(long value);

    boolean isSerializing();

    ByteSerializer add(int bytes);

    int position();
}
public class ByteCountSerializer implements ByteSerializer {
    private int count = 0;

    @Override
    public ByteSerializer put(byte value) {
        count += 1;
        return this;
    }

    @Override
    public ByteSerializer putInt(int value) {
        count += 4;
        return this;
    }

    @Override
    public ByteSerializer putLong(long value) {
        count += 8;
        return this;
    }

    @Override
    public boolean isSerializing() {
        return false;
    }

    @Override
    public ByteSerializer add(int bytes) {
        if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");

        count += bytes;
        return this;
    }

    @Override
    public int position() {
        return count;
    }
}
import java.nio.ByteBuffer;

public class ByteBufferSerializer implements ByteSerializer {
    private final ByteBuffer buffer;

    public ByteBufferSerializer(int bufferCapacity) {
        if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");

        this.buffer = ByteBuffer.allocate(bufferCapacity);
    }

    @Override
    public ByteSerializer put(byte value) {
        buffer.put(value);
        return this;
    }

    @Override
    public ByteSerializer putInt(int value) {
        buffer.putInt(value);
        return this;
    }

    @Override
    public ByteSerializer putLong(long value) {
        buffer.putLong(value);
        return this;
    }

    @Override
    public boolean isSerializing() {
        return true;
    }

    @Override
    public ByteSerializer add(int bytes) {
        if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");

        for (int b = 0; b < bytes; b++) {
            buffer.put((byte)0);
        }
        return this;
        // or throw new UnsupportedOperationException();
    }

    @Override
    public int position() {
        return buffer.position();
    }

    public ByteBuffer buffer() {
        return buffer;
    }
}

Dans votre code, vous feriez quelque chose du genre (non testé):

ByteCountSerializer counter = new ByteCountSerializer();
dataFrame.serialize(counter);
ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position());
dataFrame.serialize(serializer);
ByteBuffer buffer = serializer.buffer();
// ... write buffer, ?, profit ...

Votre méthode DataFrame.serialize doit être refactorisée pour accepter un ByteSerializer, et dans les cas où elle générerait des données, elle doit vérifier isSerializing pour savoir si elle ne doit calculer que la taille ou écrire réellement des octets.

Je laisse la combinaison des deux approches comme un exercice, principalement parce que cela dépend beaucoup de la façon dont vous décidez de le faire.

Par exemple, vous pouvez faire ByteBufferSerializer utiliser le pool directement et conserver une capacité arbitraire (par exemple votre 70000), vous pouvez mettre en commun les ByteBuffer s par capacité (mais au lieu de la capacité nécessaire, utilisez le moins de puissance de 2 supérieur à la capacité requise, et définissez la limite du tampon avant de revenir de acquire), ou vous pouvez mettre en commun les ByteBufferSerializer directement tant que vous ajoutez une méthode reset().

Existe-t-il également un meilleur moyen par rapport à ce que je fais qui regroupe mon en-tête et mes données dans un tableau d'octets?

Oui. Passez autour de l'instance de mise en mémoire tampon d'octets au lieu que certaines méthodes renvoient des tableaux d'octets qui sont ignorés au moment où leur longueur est vérifiée ou leur contenu est copié.

Je dois également m'assurer qu'il est thread-safe car il peut être appelé par plusieurs threads.

Tant que chaque tampon est utilisé par un seul thread, avec une synchronisation appropriée, vous n'avez pas à vous inquiéter.

Une bonne synchronisation signifie que votre gestionnaire de pool a une sémantique d'acquisition et de publication dans ses méthodes, et que si un tampon est utilisé par plusieurs threads entre son extraction et son retour dans le pool, vous ajoutez une sémantique de libération dans le thread qui arrête d'utiliser le tampon et ajouter une sémantique d'acquisition dans le thread qui commence à utiliser le tampon. Par exemple, si vous passez le tampon via CompletableFuture s, vous ne devriez pas avoir à vous en soucier, ou si vous communiquez explicitement entre les threads avec un Exchanger ou une implémentation correcte de { {X2}}.

De la description du package de java.util.concurrent:

Les méthodes de toutes les classes de java.util.concurrent et de ses sous-packages étendent ces garanties à une synchronisation de niveau supérieur. En particulier:

  • Actions dans un thread avant de placer un objet dans une collection simultanée qui se produit avant actions après l'accès ou la suppression de cet élément de la collection dans un autre thread.

  • Actions dans un thread avant la soumission d'un Runnable à un Executor arrive-avant son exécution commence. De même pour Callables soumis à un ExecutorService.

  • Actions entreprises par le calcul asynchrone représenté par une action Future arrive-avant suite à la récupération du résultat via Future.get() dans un autre thread.

  • Actions préalables à la "libération" des méthodes de synchronisation telles que les actions Lock.unlock, Semaphore.release et CountDownLatch.countDown se produisant avant suite à une méthode "d'acquisition" réussie tels que Lock.lock, Semaphore.acquire, Condition.await et CountDownLatch.await sur le même objet synchroniseur dans un autre thread.

  • Pour chaque paire de threads qui réussissent à échanger des objets via un Exchanger, les actions antérieures à exchange() dans chaque thread se produisent avant celles qui suivent le {{X2 correspondant }} dans un autre fil de discussion.

  • Actions préalables à l'appel de CyclicBarrier.await et Phaser.awaitAdvance (ainsi que ses variantes) actions se produisant avant effectuées par l'action de barrière et actions effectuées par l'action de barrière < em> les actions qui se produisent avant suite à un retour réussi du await correspondant dans d'autres threads.

2
acelent 25 janv. 2017 à 10:03

Une autre façon de le faire serait d'utiliser un DataOutputStream autour d'un ByteArrayOutputStream, mais vous devriez concentrer votre réglage des performances autour des endroits où il est nécessaire, et ce n'est pas l'un d'entre eux. L'efficacité n'est en aucun cas un problème ici. Les E / S réseau dominent par ordres de grandeur.

Une autre raison d'utiliser un ByteArrayOutputStream est que vous n'avez pas à deviner la taille de la mémoire tampon à l'avance: elle augmentera si nécessaire.

Pour garantir la sécurité des threads, n'utilisez que des variables locales.

0
Marquis of Lorne 20 janv. 2017 à 00:45