Optimizing network bandwidth over distributed data

2019-04-23 15:35发布

I have a distributed/federated database structured as follows:

  1. The databases are spread across three geographic locations ("nodes")
  2. Multiple databases are clustered at each node
  3. The relational databases are a mix of PostgreSQL, MySQL, Oracle, and MS SQL Server; the non-relational databases are either MongoDB or Cassandra
  4. Loose coupling within each node and across the node federation is achieved via RabbitMQ, with each node running a RabbitMQ broker

I am implementing a readonly inter-node aggregation job system for jobs that span the node federation (i.e. for jobs that are not local to a node). These jobs only perform "get" queries - they do not modify the databases. (If the results of the jobs are intended to go into one or more of the databases then this is accomplished by a separate job that is not part of the inter-node job system I am trying to optimize.) My objective is to minimize the network bandwidth required by these jobs (first to minimize the inter-node / WAN bandwidth, then to minimize the intra-node / LAN bandwidth); I assume a uniform cost for each WAN link, and another uniform cost for each LAN link. The jobs are not particularly time-sensitive. I perform some CPU load-balancing within a node but not between nodes.

The amount of data transported across the WAN/LAN for the aggregation jobs is small relative to the amount of database writes that are local to a cluster or to a specific database, so it would not be practical to fully distribute the databases across the federation.

The basic algorithm I use for minimizing network bandwidth is:

  1. Given a job that runs on a set of data that is spread across the federation, the manager node sends a message to each the other nodes that contains the relevant database queries.
  2. Each node runs its set of queries, compresses them with gzip, caches them, and sends their compressed sizes to the manager node.
  3. The manager moves to the node containing the plurality of the data (specifically, to the machine within the cluster that has the most data and that has idle cores); it requests the rest of the data from the other two nodes and from the other machines within the cluster, then it runs the job.

When possible the jobs use a divide-and-conquer approach to minimize the amount of data co-location that is needed. For example, if the job needs to compute the sums of all Sales figures across the federation, then each node locally calculates its Sales sums which are then aggregated at the manager node (rather than copying all of the unprocessed Sales data to the manager node). However, sometimes (such as when performing a join between two tables that are located at different nodes) data co-location is needed.

The first thing I did to optimize this was aggregate the jobs, and to run the aggregated jobs at ten minute epochs (the machines are all running NTP, so I can be reasonably certain that "every ten minutes" means the same thing at each node). The goal is for two jobs to be able to share the same data, which reduces the overall cost of transporting the data.

  1. Given two jobs that query the same table, I generate each job's resultset, and then I take the intersection of the two resultsets.
  2. If both jobs are scheduled to run on the same node, then the network transfer cost is calculated as the sum of the two resultsets minus the intersection of the two resultsets.
  3. The two resultsets are stored to PostgreSQL temporary tables (in the case of relational data) or else to temporary Cassandra columnfamilies / MongoDB collections (in the case of nosql data) at the node selected to run the jobs; the original queries are then performed against the combined resultsets and the data delivered is to the individual jobs. (This step is only performed on combined resultsets; individual resultset data is simply delivered to its job without first being stored on temporary tables/column families/collections.)

This results in an improvement to network bandwidth, but I'm wondering if there's a framework/library/algorithm that would improve on this. One option I considered is to cache the resultsets at a node and to account for these cached resultsets when determining network bandwidth (i.e. trying to reuse resultsets across jobs in addition to the current set of pre-scheduled co-located jobs, so that e.g. a job run in one 10-minute epoch can use a cached resultset from a previous 10-minute resultset), but unless the jobs use the exact same resultsets (i.e. unless they use identical where clauses) then I don't know of a general-purpose algorithm that would fill in the gaps in the resultset (for example, if the resultset used the clause "where N > 3" and a different job needs the resultset with the clause "where N > 0" then what algorithm could I use to determine that I need to take the union of the original resultset and with the resultset with the clause "where N > 0 AND N <= 3") - I could try to write my own algorithm to do this, but the result would be a buggy useless mess. I would also need to determine when the cached data is stale - the simplest way to do this is to compare the cached data's timestamp with the last-modified timestamp on the source table and replace all of the data if the timestamp has changed, but ideally I'd want to be able to update only the values that have changed with per-row or per-chunk timestamps.

1条回答
仙女界的扛把子
2楼-- · 2019-04-23 15:59

I've started to implement my solution to the question.

In order to simplify the intra-node cache and also to simplify CPU load balancing, I'm using a Cassandra database at each database cluster ("Cassandra node") to run the aggregation jobs (previously I was aggregating the local database resultsets by hand) - I'm using the single Cassandra database for the relational, Cassandra, and MongoDB data (the downside is that some relational queries run slower on Cassandra, but this is made up for by the fact that the single unified aggregation database is easier to maintain than the separate relational and non-relational aggregation databases). I am also no longer aggregating jobs in ten minute epochs since the cache makes this algorithm unnecessary.

Each machine in a node refers to a Cassandra columnfamily called Cassandra_Cache_[MachineID] that is used to store the key_ids and column_ids that it has sent to the Cassandra node. The Cassandra_Cache columnfamily consists of a Table column, a Primary_Key column, a Column_ID column, a Last_Modified_Timestamp column, a Last_Used_Timestamp column, and a composite key consisting of the Table|Primary_Key|Column_ID. The Last_Modified_Timestamp column denotes the datum's last_modified timestamp from the source database, and the Last_Used_Timestamp column denotes the timestamp at which the datum was last used/read by an aggregation job. When the Cassandra node requests data from a machine, the machine calculates the resultset and then takes the set difference of the resultset and the table|key|columns that are in its Cassandra_Cache and that have the same Last_Modified_Timestamp as the rows in its Cassandra_Cache (if the timestamps don't match then the cached data is stale and is updated along with the new Last_Modified_Timestamp). The local machine then sends the set difference to the Cassandra node and updates its Cassandra_Cache with the set difference and updates the Last_Used_Timestamp on each cached datum that was used to compose the resultset. (A simpler alternative to maintaining a separate timestamp for each table|key|column is to maintain a timestamp for each table|key, but this is less precise and the table|key|column timestamp is not overly complex.) Keeping the Last_Used_Timestamps in sync between Cassandra_Caches only requires that the local machines and remote nodes send the Last_Used_Timestamp associated with each job, since all data within a job uses the same Last_Used_Timestamp.

The Cassandra node updates its resultset with the new data that it receives from within the node and also with the data that it receives from the other nodes. The Cassandra node also maintains a columnfamily that stores the same data that is in each machine's Cassandra_Cache (except for the Last_Modified_Timestamp, which is only needed on the local machine to determine when data is stale), along with a source id indicating if the data came from within the within the node or from another node - the id distinguishes between the different nodes, but does not distinguish between the different machines within the local node. (Another option is to use a unified Cassandra_Cache rather than using one Cassandra_Cache per machine plus another Cassandra_Cache for the node, but I decided that the added complexity was not worth the space savings.)

Each Cassandra node also maintains a Federated_Cassandra_Cache, which consists of the {Database, Table, Primary_Key, Column_ID, Last_Used_Timestamp} tuples that have been sent from the local node to one of the other two nodes.

When a job comes through the pipeline, each Cassandra node updates its intra-node cache with the local resultsets, and also completes the sub-jobs that can be performed locally (e.g. in a job to sum data between multiple nodes, each node sums its intra-node data in order to minimize the amount of data that needs to be co-located in the inter-node federation) - a sub-job can be performed locally if it only uses intra-node data. The manager node then determines on which node to perform the rest of the job: each Cassandra node can locally compute the cost of sending its resultset to another node by taking the set difference of its resultset and the subset of the resultset that has been cached according to its Federated_Cassandra_Cache, and the manager node minimizes the cost equation ["cost to transport resultset from NodeX" + "cost to transport resultset from NodeY"]. For example, it costs Node1 {3, 5} to transport its resultset to {Node2, Node3}, it costs Node2 {2, 2} to transport its resultset to {Node1, Node3}, and it costs Node3 {4, 3} to transport its resultset to {Node1, Node2}, therefore the job is run on Node1 with a cost of "6".

I'm using an LRU eviction policy for each Cassandra node; I was originally using an oldest-first eviction policy because it is simpler to implement and requires fewer writes to the Last_Used_Timestamp column (once per datum update instead of once per datum read), but the implementation of an LRU policy turned out not to be overly complex and the Last_Used_Timestamp writes did not create a bottleneck. When a Cassandra node reaches 20% free space it evicts data until it reaches 30% free space, hence each eviction is approximately the size of 10% of the total space available. The node maintains two timestamps: the timestamp of the last-evicted intra-node data, and the timestamp of the last-evicted inter-node / federated data; due to the increased latency of inter-node communication relative to that of intra-node communication, the goal of the eviction policy to have 75% of the cached data be inter-node data and 25% of the cached data be intra-node data, which can be quickly approximated by having 25% of each eviction be inter-node data and 75% of each eviction be intra-node data. Eviction works as follows:

while(evicted_local_data_size < 7.5% of total space available) {
    evict local data with Last_Modified_Timestamp < 
        (last_evicted_local_timestamp += 1 hour)
    update evicted_local_data_size with evicted data
}

while(evicted_federated_data_size < 2.5% of total space available) {
    evict federated data with Last_Modified_Timestamp < 
        (last_evicted_federated_timestamp += 1 hour)
    update evicted_federated_data_size with evicted data
}

Evicted data is not permanently deleted until eviction acknowledgments have been received from the machines within the node and from the other nodes.

The Cassandra node then sends a notification to the machines within its node indicating what the new last_evicted_local_timestamp is. The local machines update their Cassandra_Caches to reflect the new timestamp, and send a notification to the Cassandra node when this is complete; when the Cassandra node has received notifications from all local machines then it permanently deletes the evicted local data. The Cassandra node also sends a notification to the remote nodes with the new last_evicted_federated_timestamp; the other nodes update their Federated_Cassandra_Caches to reflect the new timestamp, and the Cassandra node permanently deletes the evicted federated data when it receives notifications from each node (the Cassandra node keeps track of which node a piece of data came from, so after receiving an eviction acknowledgment from NodeX the node can permanently delete the evicted NodeX data before receiving an eviction acknowledgment from NodeY). Until all machines/nodes have sent their notifications, the Cassandra node uses the cached evicted data in its queries if it receives a resultset from a machine/node that has not evicted its old data. For example, the Cassandra node has a local Table|Primary_Key|Column_ID datum that it has evicted, and meanwhile a local machine (which has not processed the eviction request) has not included the Table|Primary_Key|Column_ID datum in its resultset because it thinks that the Cassandra node already has the datum in its cache; the Cassandra node receives the resultset from the local machine, and because the local machine has not acknowledged the eviction request the Cassandra node includes the cached evicted datum in its own resultset.

查看更多
登录 后发表回答