Question: How can you modify 3000pc so the producer stops producing once it fills the queue? /* 3000pc.c */ 2 3 4 5 6 7 8
How can you modify 3000pc so the producer stops producing once it fills the queue?
/* 3000pc.c */
2
3
4
5
6
7
8 #include
9 #include
10 #include
11 #include
12 #include
13 #include
14 #include
15 #include
16 #include
17 #include
18 #include
19
20 #define QUEUESIZE 32
21 #define WORDSIZE 16
22
23 const int wordlist_size = 27;
24 const char *wordlist[] = {
25 "Alpha",
26 "Bravo",
27 "Charlie",
28 "Delta",
29 "Echo",
30 "Foxtrot",
31 "Golf",
32 "Hotel",
33 "India",
34 "Juliet",
35 "Kilo",
36 "Lima",
37 "Mike",
38 "November",
39 "Oscar",
40 "Papa",
41 "Quebec",
42 "Romeo",
43 "Sierra",
44 "Tango",
45 "Uniform",
46 "Victor",
47 "Whiskey",
48 "X-ray",
49 "Yankee",
50 "Zulu",
51 "Dash"
52 };
53
54 typedef struct entry {
55 char word[WORDSIZE];
56 sem_t lock;
57 } entry;
58
59 typedef struct shared {
60 int prod_waiting;
61 int con_waiting;
62 entry queue[QUEUESIZE];
63 int last_produced;
64 int last_consumed;
65 pid_t prod_pid;
66 pid_t con_pid;
67 int prod_count;
68 int con_count;
69 } shared;
70
71
72 void report_error(char *error)
73 {
74 fprintf(stderr, "Error: %s ", error);
75 }
76
77 void usage_exit(char *progname)
78 {
79 fprintf(stderr,
80 "Usage: %s
81 progname);
82 exit(-1);
83 }
84
85 void producer_handler(int the_signal)
86 {
87 if (the_signal == SIGUSR1) {
88 fprintf(stderr, "Producer received SIGUSR1. ");
89 return;
90
91 } else {
92 fprintf(stderr, "Producer: No handler for for signal %d?! ",
93 the_signal);
94 return;
95 }
96 }
97
98 void consumer_handler(int the_signal)
99 {
100 if (the_signal == SIGUSR1) {
101 fprintf(stderr, "Consumer received SIGUSR1. ");
102 return;
103 } else {
104 fprintf(stderr, "Consumer: No handler for for signal %d?! ",
105 the_signal);
106 return;
107 }
108 }
109
110 void pick_word(char *word)
111 {
112 int pick;
113
114 pick = random() % wordlist_size;
115
116 strcpy(word, wordlist[pick]);
117 }
118
119 void wait_for_producer(shared *s)
120 {
121 struct timespec delay;
122
123 delay.tv_sec = 100;
124 delay.tv_nsec = 0;
125
126 s->con_waiting = 1;
127
128 while (s->con_waiting) {
129 nanosleep(&delay, NULL);
130 }
131 }
132
133 void wait_for_consumer(shared *s)
134 {
135 struct timespec delay;
136
137 delay.tv_sec = 100;
138 delay.tv_nsec = 0;
139
140 s->prod_waiting = 1;
141
142 while (s->prod_waiting) {
143 nanosleep(&delay, NULL);
144 }
145 }
146
147 void wakeup_consumer(shared *s)
148 {
149 if (s->con_waiting) {
150 s->con_waiting = 0;
151 kill(s->con_pid, SIGUSR1);
152 }
153 }
154
155 void wakeup_producer(shared *s)
156 {
157 if (s->prod_waiting) {
158 s->prod_waiting = 0;
159 kill(s->prod_pid, SIGUSR1);
160 }
161 }
162
163 void output_word(int c, char *w)
164 {
165 printf("Word %d: %s ", c, w);
166 }
167
168 int queue_word(char *word, shared *s)
169 {
170 entry *e;
171 int current, retval;
172
173 current = (s->last_produced + 1) % QUEUESIZE;
174
175 e = &s->queue[current];
176
177 sem_wait(&e->lock);
178
179 if (e->word[0] != '\0') {
180 /* consumer hasn't consumed this entry yet */
181 sem_post(&e->lock);
182 wait_for_consumer(s);
183 sem_wait(&e->lock);
184 }
185
186 if (e->word[0] != '\0') {
187 fprintf(stderr, "ERROR: No room for producer after waiting! ");
188 retval = -1;
189 goto done;
190 } else {
191 strncpy(e->word, word, WORDSIZE);
192 s->last_produced = current;
193 s->prod_count++;
194 wakeup_consumer(s);
195 retval = 0;
196 goto done;
197 }
198
199 done:
200 sem_post(&e->lock);
201 return retval;
202 }
203
204 int get_next_word(char *word, shared *s)
205 {
206 entry *e;
207 int current, retval;
208
209 current = (s->last_consumed + 1) % QUEUESIZE;
210
211 e = &s->queue[current];
212
213 sem_wait(&e->lock);
214
215 if (e->word[0] == '\0') {
216 /* producer hasn't filled in this entry yet */
217 sem_post(&e->lock);
218 wait_for_producer(s);
219 sem_wait(&e->lock);
220 }
221
222 if (e->word[0] == '\0') {
223 fprintf(stderr, "ERROR: Nothing for consumer after waiting! ");
224 retval = -1;
225 goto done;
226 } else {
227 strncpy(word, e->word, WORDSIZE);
228 e->word[0] = '\0';
229 s->last_consumed = current;
230 s->con_count++;
231 wakeup_producer(s);
232 retval = 0;
233 goto done;
234 }
235
236 done:
237 sem_post(&e->lock);
238 return retval;
239 }
240
241 void producer(shared *s, int event_count, int producer_delay_interval)
242 {
243 char word[WORDSIZE];
244 int i;
245 struct sigaction signal_handler_struct;
246
247 memset (&signal_handler_struct, 0, sizeof(signal_handler_struct));
248 signal_handler_struct.sa_handler = producer_handler;
249
250 if (sigaction(SIGUSR1, &signal_handler_struct, NULL)) {
251 fprintf(stderr, "Producer couldn't register SIGUSR1 handler. ");
252 }
253
254 for (i=0; i < event_count; i++) {
255 pick_word(word);
256 queue_word(word, s);
257 if (producer_delay_interval > 0) {
258 if (i % producer_delay_interval == 0) {
259 sleep(1);
260 }
261 }
262 }
263
264 printf("Producer finished. ");
265 exit(0);
266 }
267
268 void consumer(shared *s, int event_count, int consumer_delay_interval)
269 {
270 char word[WORDSIZE];
271 int i;
272 struct sigaction signal_handler_struct;
273
274 memset (&signal_handler_struct, 0, sizeof(signal_handler_struct));
275 signal_handler_struct.sa_handler = consumer_handler;
276
277 if (sigaction(SIGUSR1, &signal_handler_struct, NULL)) {
278 fprintf(stderr, "Consumer couldn't register SIGUSR1 handler. ");
279 }
280
281 for (i=0; i < event_count; i++) {
282 get_next_word(word, s);
283 output_word(s->con_count, word);
284 if (consumer_delay_interval > 0) {
285 if (i % consumer_delay_interval == 0) {
286 sleep(1);
287 }
288 }
289 }
290
291 printf("Consumer finished. ");
292 exit(0);
293 }
294
295 void init_shared(shared *s)
296 {
297 int i;
298
299 s->con_waiting = 0;
300 s->last_consumed = -1;
301
302 s->prod_waiting = 0;
303 s->last_produced = -1;
304
305 s->prod_pid = -1;
306 s->con_pid = -1;
307
308 s->prod_count = 0;
309 s->con_count = 0;
310
311 for (i=0; i 312 s->queue[i].word[0] = '\0'; 313 /* semaphore is shared between processes, 314 and initial value is 1 (unlocked) */ 315 sem_init(&s->queue[i].lock, 1, 1); 316 } 317 } 318 319 int main(int argc, char *argv[]) 320 { 321 int pid, count, prod_interval, con_interval; 322 323 shared *s; 324 325 srandom(42); 326 327 if (argc < 4) { 328 if (argc < 1) { 329 report_error("no command line"); 330 usage_exit(argv[0]); 331 } else { 332 report_error("Not enough arguments"); 333 usage_exit(argv[0]); 334 } 335 } 336 337 count = atoi(argv[1]); 338 prod_interval = atoi(argv[2]); 339 con_interval = atoi(argv[3]); 340 341 s = (shared *) mmap(NULL, sizeof(shared), 342 PROT_READ|PROT_WRITE, 343 MAP_SHARED|MAP_ANONYMOUS, -1, 0); 344 345 if (s == MAP_FAILED) { 346 report_error(strerror(errno)); 347 } 348 349 init_shared(s); 350 351 pid = fork(); 352 353 if (pid) { 354 /* producer */ 355 s->prod_pid = getpid(); 356 producer(s, count, prod_interval); 357 } else { 358 /* consumer */ 359 s->con_pid = getpid(); 360 consumer(s, count, con_interval); 361 } 362 363 /* This line should never be reached */ 364 return -1; 365 }
Step by Step Solution
There are 3 Steps involved in it
Get step-by-step solutions from verified subject matter experts
