Raptor
A fast and space-efficient pre-filter
All Classes Namespaces Files Functions Variables Macros Pages Concepts
search_partitioned_hibf.hpp
Go to the documentation of this file.
1// --------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2023, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2023, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/raptor/blob/main/LICENSE.md
6// --------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <seqan3/search/views/minimiser_hash.hpp>
16
18#include <raptor/contrib/std/chunk_view.hpp>
24
25namespace raptor
26{
27
28template <typename index_t>
29void search_partitioned_hibf(search_arguments const & arguments, index_t && index)
30{
31 seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::id, seqan3::field::seq>> fin{
32 arguments.query_file};
33 using record_type = typename decltype(fin)::record_type;
34
36
37 sync_out synced_out{arguments};
38
39 std::vector<std::string> results; // cache results since we are searching multiple hibfs
40
41 raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()};
42
43 // searching with storing all results in results map
44 auto worker = [&](size_t const start, size_t const extent, bool const output_results)
45 {
46 seqan::hibf::serial_timer local_compute_minimiser_timer{};
47 seqan::hibf::serial_timer local_query_ibf_timer{};
48 seqan::hibf::serial_timer local_generate_results_timer{};
49
50 auto agent = index.ibf().membership_agent();
51
52 std::string result_string{};
53 std::vector<uint64_t> minimiser;
54
55 auto hash_adaptor = seqan3::views::minimiser_hash(arguments.shape,
56 seqan3::window_size{arguments.window_size},
57 seqan3::seed{adjust_seed(arguments.shape_weight)});
58
59 for (size_t pos = start; pos < start + extent; ++pos)
60 {
61 auto const & seq = records[pos].sequence();
62 std::string & result_string = results[pos];
63
64 auto minimiser_view = seq | hash_adaptor | std::views::common;
65 local_compute_minimiser_timer.start();
66 minimiser.assign(minimiser_view.begin(), minimiser_view.end());
67 local_compute_minimiser_timer.stop();
68
69 size_t const minimiser_count{minimiser.size()};
70 size_t const threshold = thresholder.get(minimiser_count);
71
72 local_query_ibf_timer.start();
73 auto & result = agent.membership_for(minimiser, threshold); // Results contains user bin IDs
74 local_query_ibf_timer.stop();
75 local_generate_results_timer.start();
76 for (auto && user_bin_id : result)
77 {
78 result_string += std::to_string(user_bin_id);
79 result_string += ',';
80 }
81
82 if (output_results)
83 {
84 result_string.insert(result_string.begin(), '\t');
85 auto const & id = records[pos].id();
86 result_string.insert(result_string.begin(), id.begin(), id.end());
87
88 if (auto & last_char = result_string.back(); last_char == ',')
89 last_char = '\n';
90 else
91 result_string += '\n';
92
93 synced_out.write(result_string);
94 result_string.clear(); // free memory
95 }
96 local_generate_results_timer.stop();
97 }
98
99 arguments.compute_minimiser_timer += local_compute_minimiser_timer;
100 arguments.query_ibf_timer += local_query_ibf_timer;
101 arguments.generate_results_timer += local_generate_results_timer;
102 };
103
104 for (auto && chunked_records : fin | seqan::stl::views::chunk((1ULL << 20) * 10))
105 {
106 // prefetch the first partition while query IO is done
107 auto cereal_future = std::async(std::launch::async,
108 [&]()
109 {
110 load_index(index, arguments, 0);
111 });
112
113 records.clear();
114 arguments.query_file_io_timer.start();
115 std::ranges::move(chunked_records, std::back_inserter(records));
116 arguments.query_file_io_timer.stop();
117
118 results.resize(records.size());
119
120 cereal_future.get();
121 synced_out.write_header(arguments, index.ibf().ibf_vector[0].hash_function_count());
122
123 assert(arguments.parts > 0);
124 for (int part = 0; part < arguments.parts - 1; ++part)
125 {
126 do_parallel(worker, records.size(), arguments.threads, false /*do not write results*/);
127 arguments.write_timings_to_file();
128 arguments.compute_minimiser_timer = {};
129 arguments.query_ibf_timer = {};
130 arguments.generate_results_timer = {};
131 arguments.load_index_timer = {};
132 load_index(index, arguments, part + 1);
133 }
134
135 do_parallel(worker, records.size(), arguments.threads, true /*write results*/);
136 arguments.write_timings_to_file();
137 }
138}
139
140} // namespace raptor
Provides raptor::adjust_seed.
T async(T... args)
T back(T... args)
T back_inserter(T... args)
T begin(T... args)
Definition threshold.hpp:19
T clear(T... args)
Provides raptor::dna4_traits.
Provides raptor::do_parallel.
T insert(T... args)
Provides raptor::load_index.
T move(T... args)
T resize(T... args)
Provides raptor::sync_out.
Provides raptor::threshold::threshold.
T to_string(T... args)
Hide me