Long running program in Google App Engine

2020-03-25 05:54发布

I have written a servlet code in Java for reading a line from file which is stored in Google Cloud Storage . Once I read each line I pass it to prediction API . Once i get the prediction of the text passed . I append it to original line and store it in some other file in Google cloud storage .

This sources file is a csv and has more than 10,000 records . Since I am parsing it individually,passing it to prediction API and then storing back to Cloud Storage . It takes lot of time to do so . Since App Engine has limit of 30 section and also task queues has limitation . Can any suggest me some option ? Since re-initiating the program is not an option since I wont be able to initiate the prediction from where i stopped .

Here's my code :

@SuppressWarnings("serial")
public class PredictionWebAppServlet extends HttpServlet {

    private static final String APPLICATION_NAME = "span-test-app";

    static final String MODEL_ID = "span-senti";
    static final String STORAGE_DATA_LOCATION = "/bigdata/training_set/";
    private static HttpTransport httpTransport;
    private static final JsonFactory JSON_FACTORY = JacksonFactory
            .getDefaultInstance();

    public static final String INPUT_BUCKETNAME = "bigdata";
    public static final String INPUT_FILENAME = "abc.csv";

    public static final String OUTPUT_BUCKETNAME = "bigdata";
    public static final String OUTPUT_FILENAME = "def.csv";

    private static Credential authorize() throws Exception {

        Credential cr = new GoogleCredential.Builder()
                .setTransport(httpTransport)
                .setJsonFactory(JSON_FACTORY)
                .setServiceAccountId(
                        "878482284233-aacp8vd5297aqak7v5r0f507qr63mab4@developer.gserviceaccount.com")
                .setServiceAccountScopes(
                        Collections.singleton(PredictionScopes.PREDICTION))
                .setServiceAccountPrivateKeyFromP12File(
                        new File(
                                "28617ba6faac0a51eb2208edba85d2e20e6081b4-privatekey.p12"))
                .build();
        return cr;
    }



    public void doGet(HttpServletRequest req, HttpServletResponse resp)
            throws IOException {
        try {
            httpTransport = GoogleNetHttpTransport.newTrustedTransport();
            Credential credential = authorize();

            Prediction prediction = new Prediction.Builder(httpTransport,
                    JSON_FACTORY, credential).setApplicationName(APPLICATION_NAME)
                    .build();


            GcsService gcsService = GcsServiceFactory.createGcsService();

            GcsFilename filename = new GcsFilename(INPUT_BUCKETNAME, INPUT_FILENAME);
            GcsFilename filename1 = new GcsFilename(OUTPUT_BUCKETNAME,
                    OUTPUT_FILENAME);
            GcsFileOptions options = new GcsFileOptions.Builder()
                    .mimeType("text/html").acl("public-read")
                    .addUserMetadata("myfield1", "my field value").build();


            GcsOutputChannel writeChannel = gcsService.createOrReplace(filename1, options);

            PrintWriter writer = new PrintWriter(Channels.newWriter(writeChannel,
                    "UTF8"));


            GcsInputChannel readChannel = null;
            BufferedReader reader = null;

            readChannel = gcsService.openReadChannel(filename, 0);
            reader = new BufferedReader(Channels.newReader(readChannel, "UTF8"));
            String line;
            String cvsSplitBy = ",";
            String temp_record = "";
            Input input = new Input();
            InputInput inputInput = new InputInput();


            while ((line = reader.readLine()) != null) {

                String[] post = line.split(cvsSplitBy);

                inputInput.setCsvInstance(Collections
                        .<Object> singletonList(post[1]));
                input.setInput(inputInput);

                Output output = prediction.trainedmodels()
                        .predict("878482284233", MODEL_ID, input).execute();
                for (int i = 0; i < 10; i++) {
                    temp_record = temp_record + post[i] + ",";
                }
                temp_record = temp_record + output.getOutputLabel();


                 writer.println(temp_record);

            }

            writer.flush();
            writer.close();

            //resp.getWriter().println(temp_record);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally{

        }
    }
}

2条回答
来,给爷笑一个
2楼-- · 2020-03-25 06:02

This sort of thing is exactly what the MapReduce framework is for.

查看更多
Bombasti
3楼-- · 2020-03-25 06:23

You are hinting at it yourself.

If you think your job can finish within 10 minutes, you can do it with tasks queues alone.

If not, you will need to use a combination of task queues and backends. You need to push it into a backend instance. Take a look at Push queues and backends

UPDATE - with modules instead of backends

Backends are deprecated in favour of modules. A way to do it with modules is to:

  1. convert your app to modules structure
  2. define a module with manual scaling
  3. handle the "/_ah/start" url in that module
  4. execute all of your job in the "/_ah/start" handler

Manual scaling instances don't have constraints on how long time they may run. You can run "forever" in the "/_ah/start" request if the instance has manual scaling. Hey, you can even start threads, if you like. But it should not be necessary for this job. Just run until done.

查看更多
登录 后发表回答