Get Knex.js transactions working with ES7 async/aw

2020-02-08 16:06发布

问题:

I'm trying to couple ES7's async/await with knex.js transactions.

Although I can easily play around with non-transactional code, I'm struggling to get transactions working properly using the aforementioned async/await structure.

I'm using this module to simulate async/await

Here's what I currently have:

Non-transactional version:

works fine but is not transactional

app.js

// assume `db` is a knex instance

app.post("/user", async((req, res) => {
  const data = {
   idUser: 1,
   name: "FooBar"
  }

  try {
    const result = await(user.insert(db, data));
    res.json(result);
  } catch (err) {
    res.status(500).json(err);
  }
}));

user.js

insert: async (function(db, data) {
  // there's no need for this extra call but I'm including it
  // to see example of deeper call stacks if this is answered

  const idUser =  await(this.insertData(db, data));
  return {
    idUser: idUser
  }
}),

insertData: async(function(db, data) {
  // if any of the following 2 fails I should be rolling back

  const id = await(this.setId(db, idCustomer, data));
  const idCustomer = await(this.setData(db, id, data));

  return {
    idCustomer: idCustomer
  }
}),

// DB Functions (wrapped in Promises)

setId: function(db, data) {
  return new Promise(function (resolve, reject) {
    db.insert(data)
    .into("ids")
    .then((result) => resolve(result)
    .catch((err) => reject(err));
  });
},

setData: function(db, id, data) {
  data.id = id;

  return new Promise(function (resolve, reject) {
    db.insert(data)
    .into("customers")
    .then((result) => resolve(result)
    .catch((err) => reject(err));
  });
}

Attempt to make it transactional

user.js

// Start transaction from this call

insert: async (function(db, data) {
 const trx = await(knex.transaction());
 const idCustomer =  await(user.insertData(trx, data));

 return {
    idCustomer: idCustomer
  }
}),

it seems that await(knex.transaction()) returns this error:

[TypeError: container is not a function]

回答1:

Async/await is based around promises, so it looks like you'd just need to wrap all the knex methods to return "promise compatible" objects.

Here is a description on how you can convert arbitrary functions to work with promises, so they can work with async/await:

Trying to understand how promisification works with BlueBird

Essentially you want to do this:

var transaction = knex.transaction;
knex.transaction = function(callback){ return knex.transaction(callback); }

This is because "async/await requires the either a function with a single callback argument, or a promise", whereas knex.transaction looks like this:

function transaction(container, config) {
  return client.transaction(container, config);
}

Alternatively, you can create a new async function and use it like this:

async function transaction() {
  return new Promise(function(resolve, reject){
    knex.transaction(function(error, result){
      if (error) {
        reject(error);
      } else {
        resolve(result);
      }
    });
  });
}

// Start transaction from this call

insert: async (function(db, data) {
 const trx = await(transaction());
 const idCustomer =  await(person.insertData(trx, authUser, data));

 return {
    idCustomer: idCustomer
  }
})

This may be useful too: Knex Transaction with Promises

(Also note, I'm not familiar with knex's API, so not sure what the params are passed to knex.transaction, the above ones are just for example).



回答2:

I couldn't find a solid answer for this anywhere (with rollbacks and commits) so here's my solution.

First you need to "Promisify" the knex.transaction function. There are libraries for this, but for a quick example I did this:

const promisify = (fn) => new Promise((resolve, reject) => fn(resolve));

This example creates a blog post and a comment, and rolls back both if there's an error with either.

const trx = await promisify(db.transaction);

try {
  const postId = await trx('blog_posts')
  .insert({ title, body })
  .returning('id'); // returns an array of ids

  const commentId = await trx('comments')
  .insert({ post_id: postId[0], message })
  .returning('id'); 

  await trx.commit();
} catch (e) {
  await trx.rollback();
}


回答3:

I think I have found a more elegant solution to the problem.

Borrowing from the knex Transaction docs, I will contrast their promise-style with the async/await-style that worked for me.

Promise Style

var Promise = require('bluebird');

// Using trx as a transaction object:
knex.transaction(function(trx) {

  var books = [
    {title: 'Canterbury Tales'},
    {title: 'Moby Dick'},
    {title: 'Hamlet'}
  ];

  knex.insert({name: 'Old Books'}, 'id')
    .into('catalogues')
    .transacting(trx)
    .then(function(ids) {
      return Promise.map(books, function(book) {
        book.catalogue_id = ids[0];

        // Some validation could take place here.

        return knex.insert(book).into('books').transacting(trx);
      });
    })
    .then(trx.commit)
    .catch(trx.rollback);
})
.then(function(inserts) {
  console.log(inserts.length + ' new books saved.');
})
.catch(function(error) {
  // If we get here, that means that neither the 'Old Books' catalogues insert,
  // nor any of the books inserts will have taken place.
  console.error(error);
});

async/await style

var Promise = require('bluebird'); // import Promise.map()

// assuming knex.transaction() is being called within an async function
const inserts = await knex.transaction(async function(trx) {

  var books = [
    {title: 'Canterbury Tales'},
    {title: 'Moby Dick'},
    {title: 'Hamlet'}
  ];

  const ids = await knex.insert({name: 'Old Books'}, 'id')
    .into('catalogues')
    .transacting(trx);

  const inserts = await Promise.map(books, function(book) {
        book.catalogue_id = ids[0];

        // Some validation could take place here.

        return knex.insert(book).into('books').transacting(trx);
      });
    })
  await trx.commit(inserts); // whatever gets passed to trx.commit() is what the knex.transaction() promise resolves to.
})

The docs state:

Throwing an error directly from the transaction handler function automatically rolls back the transaction, same as returning a rejected promise.

It seems that the transaction callback function is expected to return either nothing or a Promise. Declaring the callback as an async function means that it returns a Promise.

One advantage of this style is that you don't have to call the rollback manually. Returning a rejected Promise will trigger the rollback automatically.

Make sure to pass any results you want to use elsewhere to the final trx.commit() call.

I have tested this pattern in my own work and it works as expected.



回答4:

For those who come in 2019.

After I updated Knex to version 0.16.5. sf77's answer doesn't work anymore due to the change in Knex's transaction function:

transaction(container, config) {
  const trx = this.client.transaction(container, config);
  trx.userParams = this.userParams;
  return trx;
}

Solution

Keep sf77's promisify function:

const promisify = (fn) => new Promise((resolve, reject) => fn(resolve));

Update trx

from

const trx = await promisify(db.transaction);

to

const trx =  await promisify(db.transaction.bind(db));


回答5:

Adding to sf77's excellent answer, I implemented this pattern in TypeScript for adding a new user where you need to do the following in 1 transaction:

  1. creating a user record in the USER table
  2. creating a login record in the LOGIN table

public async addUser(user: User, hash: string): Promise<User> {

	//transform knex transaction such that can be used with async-await
	const promisify = (fn: any) => new Promise((resolve, reject) => fn(resolve));
	const trx: knex.Transaction  = <knex.Transaction> await promisify(db.transaction);

	try {
		let users: User [] = await trx
			.insert({
				name: user.name,
				email: user.email,
				joined: new Date()})
			.into(config.DB_TABLE_USER)
			.returning("*")

		await trx
			.insert({
				email: user.email,
				hash
			}).into(config.DB_TABLE_LOGIN)
			.returning("email")
		await trx.commit();
		return Promise.resolve(users[0]);
	}
	catch(error) { 
		await trx.rollback;
		return Promise.reject("Error adding user: " + error) 
	}
}