How to populate a Mutation for a different Cassand

2019-05-29 08:09发布

问题:

I'm trying to implement a Cassandra trigger such that when there is an update or a delete on keyspace1.tableA, the trigger will add a row to keyspace1.tableB.

The names of the columns in tableB are completely different than the columns in tableA.

I'm working with Cassandra 2.1, no option to move to a more recent version. Looking at the InvertedIndex trigger example at https://github.com/apache/cassandra/blob/cassandra-2.1/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java I can see the basics of adding a mutation:

From the InvertedIndex example:

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            Mutation mutation = new Mutation(properties.getProperty("keyspace"), cell.value());
            mutation.add(properties.getProperty("columnfamily"), cell.name(), key, System.currentTimeMillis());
            mutations.add(mutation);
        }
    }

The challenge is that in this example, the cell name being passed to mutation.add is cell.name() which is an existing object whose name we can just take using that function.

For now, I'm just trying to store the time a change was made to tableA, so tableB has two columns:

  • changetime timeuuid
  • operation text

I need to add a mutation that will add a row to tableB with the changetime and operation performed. How can I add such a row mutation in Cassandra 2.1.12?

I have tried this but I am getting a null pointer exception in the trigger:

...
String keycol = "changetime";
ByteBuffer uuidKey = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); 
ColumnIdentifier ci = new ColumnIdentifier(keycol, false);
CellName cn = CellNames.simpleSparse(ci);
mutation = new Mutation(keyspace, uuidKey);
mutation.add(tableName,cn, uuidKey, System.currentTimeMillis());
...

Any assistance would be much appreciated - I have no knowledge of Cassandra internals so no amount of detail is too much information.

回答1:

The answer is to use the CFMetaData comparator to create the CellName needed by Mutation.add(...). To provide a specific example I'll use the schema and example from the Cassandra 3.0 AuditTrigger available at https://github.com/apache/cassandra/tree/cassandra-3.0/examples/triggers

In this case, the table we will be writing to is the test.audit table defined as follows:

CREATE TABLE test.audit (key timeuuid, keyspace_name text,
    table_name text, primary_key text, PRIMARY KEY(key));

This table has a partition key named "key" and no clustering columns. For definitions see https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt, section "Partition key and clustering columns).

This is important to note because the call to makeCellName (which we will see in the sample code that follows) takes a variable list of arguments, where each argument is the value we want the corresponding clustering column to take for the row that will be affected, and the last argument is the name of the column in text format.

When there are no clustering columns (as is the case in this schema), then the call to makeCellName takes a single argument: the name of the column.

Putting all this together, the AuditTrigger function for Cassandra 2.1 that does the same thing as the 3.0 example looks like this:

public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
    CFMetaData cfm = update.metadata();

    List<Mutation> mutations = new ArrayList<>(update.getColumnCount());

    String keyspaceName = "";
    String tableName    = "";
    String keyStr       = "";

    keyspaceName = cfm.ksName;
    tableName = cfm.cfName;

    try {
        keyStr = ByteBufferUtil.string(key);
    } catch (CharacterCodingException e) {
        StringWriter errors = new StringWriter();
        e.printStackTrace(new PrintWriter(errors));
        logger.error(errors.toString());
    }

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            CFMetaData other = Schema.instance.getCFMetaData("test","audit");
            CellNameType cnt = other.comparator;

            ByteBuffer auditkey = UUIDType.instance.decompose(UUIDGen.getTimeUUID());

            // create CellName objects for each of the columns in the audit table row we are inserting
            CellName primaryKeyCellName = cnt.makeCellName("primary_key");
            CellName keyspaceCellName = cnt.makeCellName("keyspace_name");
            CellName tableCellName = cnt.makeCellName("table_name");

            try {
                // put the values we want to write to the audit table into ByteBuffer objects
                ByteBuffer ksvalbb,tablevalbb,keyvalbb;
                ksvalbb=ByteBuffer.wrap(keyspaceName.getBytes("UTF8"));
                tablevalbb=ByteBuffer.wrap(tableName.getBytes("UTF8"));
                keyvalbb=ByteBuffer.wrap(keyStr.getBytes("UTF8"));

                // create the mutation object
                Mutation mutation = new Mutation(keyspaceName, auditkey);

                // get the time which will be needed for the call to mutation.add
                long mutationTime=System.currentTimeMillis();

                // add each of the column values to the mutation
                mutation.add("audit", primaryKeyCellName, keyvalbb,  mutationTime);
                mutation.add("audit", keyspaceCellName,  ksvalbb,  mutationTime);
                mutation.add("audit", tableCellName, tablevalbb,  mutationTime);

                mutations.add(mutation);
            } catch (UnsupportedEncodingException e) {
                StringWriter errors = new StringWriter();
                e.printStackTrace(new PrintWriter(errors));
                logger.error(errors.toString());
            }
        }
    }
    return mutations;
}